提升不止一點點,Dubbo 3.0 預覽版詳細解讀
Dubbo 自 2011 年 10 月 27 日開源後,已被許多非阿里系的公司使用,其中既有噹噹網、網易考拉等互聯網公司,也不乏中國人壽、青島海爾等大型傳統企業。
自去年 12 月開始,Dubbo 3.0 便已正式進入開發階段,並備受社區和廣大 Dubbo 用戶的關注,本文將為您詳細解讀 3.0 預覽版的新特性和新功能。
歡迎大家關注我,需要更多Java面試資料和學習乾貨可以關注我的專欄【Tom貓的Java屋本專欄會長期更新java架構技術以及心得等精彩文章!
下面先解答一下兩個有意思的與 Dubbo 相關的疑問。
- 為什麼 Dubbo 一開源就是 2.0 版本?之前是否存在 1.0 版本?
- 筆者曾做過 Dubbo 協議的適配兼容,Dubbo 確實存在過 1.x 版本,而且從協議設計和模型設計上都與 2.0 的開源版本協議是完全不一樣的。下圖是關於 Dubbo 的發展路徑:
- 阿里內部正在使用 Dubbo 開源版本嗎?
- 是的,非常確定,當前開源版本的 Dubbo 在阿里巴巴被廣泛使用,而阿里的電商核心部門是用的 HSF2.2 版本,這個版本是兼容了 Dubbo 使用方式和 Remoting 協議。當然,我們現在正在做 HSF2.2 的升級,直接依賴開源版本的 Dubbo 來做內核的統一。所以,Dubbo 是得到大規模線上系統驗證的分散式服務框架,這一點毋容置疑。
Dubbo 3.0 預覽版的要點
Dubbo 3.0 在設計和功能上的新增支持和改進,主要是以下四方面:
- Dubbo 內核之 Filter 鏈的非同步化
- 這裡要指出的是,3.0 中規劃的非同步去阻塞和 2.7 中提供的非同步是兩個層面的特性。2.7 中的非同步是建立在傳統 RPC 中 request – response 會話模型上的,而 3.0 中的非同步將會從通訊協議層面由下向上構建,關注的是跨進程、全鏈路的非同步問題。通過底層協議開始支持 streaming 方式,不單單可以支持多種會話模型,還可以在協議層面開始支持反壓、限流等特性,使得整個分散式體系更具有彈性。綜上所述,2.7 關注的非同步更局限在點對點的非同步(一個 consumer 調用一個 provider),3.0 關注的非同步化,寬度上則關注整個調用鏈上的非同步,高度上則向上又可以包裝成 Rx 的編程模型。有趣的是,Spring 5.0 發布了對 Flux 的支持,隨後開始解決跨進程的非同步問題。
- 功能方面是 reactive(響應式)支持
- 最近幾年, reactive programming這個詞語的熱度迅速提升,Wikipedia 上的 reactive programming 解釋是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0會實現Reactive Stream 的 rx 介面,從而能讓用戶享受到RP帶來的響應性提升,甚至面向 RP 的架構升級。當然,我們希望 reactive 不單單能夠帶來事件(event)驅動的應用集成方式的升級,也希望在 Load Balance(選擇最優的服務節點),fault tolerance(限流降級時最好做到自適應)等方面發揮其積極價值。
- 雲原生/ ServiceMesh 方向的探索
- 我們定下的策略是進入 Envoy 社區來實現 Dubbo 融入 mesh 的理念思想,目前 Dubbo 協議已經被 Envoy 支持。當然,Dubbo Mesh 離真正可用還有很長一段距離,其在選址、負載均衡和服務治理方面的工作需要繼續在數據面建設,另外,控制面板的建設在社區也沒有提上日程。
- 融合併支持阿里內部
- Dubbo 3.0 定下了內外融合的策略,也就是說 3.0 的核心最終會在阿里巴巴的生產系統中部署,相信通過大流量、大規模的考驗,Dubbo 用戶可以獲得一個性能、穩定、服務治理實踐各方面俱佳的核心,用戶在生產系統中採用 3.0 也會更加放心。這一點也是 Dubbo 3.0 最重要的使命。
Filter 鏈的非同步化設計
Dubbo 最強大的一處設計是其在 Filter 鏈上的抽象設計,通過其擴展機制的開放性支持,用戶可以對 Dubbo 做功能增強,並允許各個擴展點被定製來是否保留。
Dubbo 的 Filter 定義如下:
@SPI
public
interface
Filter
{
/**
* do invoke filter.
* <p>
* <code>
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
* </code>
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see org.apache.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result
invoke(
Invoker
<?> invoker,
Invocation
invocation)
throws
RpcException
;
}
按照「調用一個遠程服務的方法就像調用本地的方法一樣」這種說法,這個直接返回 Result 響應的方式是非常好的,用起來是簡單直接,問題是時代變換到了需要關注體驗,需要走 Reactive 響應式的時代,也回到基本點:invoke一個 invocation 需要經過網路在不同的進程處理,天然就是非同步的過程,也就是發送請求(invocation)與接收響應(Result)本身是兩個不同的事件,是需要兩個過程方法來在 Filter 鏈處理。那麼如何改造這個關鍵的 SPI 呢?有兩種方案:
第一種,把 invoke 的返回值改成 CompletableFuture, 好處是一目了然,Result 不在建議同步獲取了;但基礎介面的簽名一改會導致代碼改造量巨大,同時也會讓原有的 SPI 擴展不在支持。
第二種,Result 介面直接繼承 CompletationStage,是代表了響應的非同步計算。這樣能進避免第一種的劣勢。所以,3.0.0 Preview 版本對內部調用鏈路實現做了一次重構:基於 CompletableFuture 實現了框架內部的全非同步調用,而在外圍編程上,同時支持同步、非同步調用模式。
值得注意的是,此次重構僅限於框架內部實現,對使用方沒有任何影響即介面上保持完全兼容。要了解 Dubbo 非同步 API 如何使用,請參考《如何基於 Dubbo 實現全非同步的調用鏈》(地址:http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html),這篇文章將著重對實現思路和原理做一些簡單介紹。此次重構的要點有:
- 框架內部採用全非同步調用模型,僅在外圍做同步、非同步適配;
- 內置Filter鏈支持非同步回調;
基本工作流程
首先我們來看一個通用的跨網路非同步調用的線程模型:
通信框架非同步發送請求消息,請求消息發送成功後,返回代表業務結果的 CompletableFuture 給業務線程。之後對於 Future 的處理,根據調用類型會有所區別:
- 對於同步請求(如上圖體現的場景),業務線程會調用 future.get 同步阻塞等待結果,當收到網路層返回的業務結果後,future.get 返回並最終將結果傳遞給調用發起方。
- 對於非同步請求,業務線程不會調用 future.get,而是將 future 保存在調用上下文或者直接返回給調用者,同時會為 future 註冊回調監聽器,以便當真正的業務結果從通信層返回時監聽器可以對結果做進一步的處理。
接下來具體看一下一次非同步 Dubbo RPC 請求的調用流程:
- 消費方面向 Proxy 代理編程,發出調用請求,請求經過 Filter 鏈向下傳遞。
- Invoker.invoke() 將請求非同步轉發給網路層,並收到代表返回結果的 Future。
- Future 被包裝到 Result,轉而由 Result 代表這次遠程調用的結果(由於 Result 的非同步屬性,此時它可能並不包含真正的返回值)。
- Result 繼續沿著調用鏈返回,在經過每個 Filter 時,Filter 可選擇註冊 Listener 監聽器,以便在業務結果返回時執行結果預處理。
- 最終 Proxy 調用 result.recreate() 將結果返回給消費者:
- 如果方法是 CompletableFuture 簽名,則返回 Future;
- 如果方法是普通同步簽名,則返回對象默認值,Future 可通過 RpcContext 拿到;
6. 調用方在拿到代表非同步業務結果的 Future 後,可選擇註冊回調監聽器,以監聽真正的業務結果返回。
同步調用和非同步調用基本上是一致的,並且也是走的回調模式,只是在鏈路返回之前做了一次阻塞 get 調用,以確保在收到實際結果時再返回。Filter 在註冊 Listener 時由於 Future 已處於 complete 狀態,因此會同時觸發回調 onResponse()/onError()。
關於流程圖中提到的 Result,Result 在 Dubbo 的一次 RPC 調用中代表返回結果,在 3.0 中 Result 自身增加了代表狀態的介面,類似 Future 現在 Result 可以代表一次未完成的調用。
要讓 Result 具備代表非同步返回結果的能力,有兩中方式來實現:
1. Result is a Future,在 Java 8 中更合理的方式是繼承 CompletionStage 介面。
public
interface
Result
extends
CompletionStage
{
}
2. 讓 Result 實例持有 Future 實例,與 1 的區別即是設計中選用「繼承」還是「組合」。
public
class
AsyncRpcResult
implements
Result
{
private
CompletableFuture
<
RpcResult
> resultFuture;
}
同時,為了讓 Result 更直觀的體現其非同步結果的特性,也為了速食麵向 Result 介面編程,我們可以考慮為Result增加一些非同步介面:
public
interface
Result
extends
Serializable
{
Result
thenApplyWithContext(
Function
<
Result
,
Result
> fn);
<U>
CompletableFuture
<U> thenApply(
Function
<
Result
, ?
extends
U> fn);
Result
get()
throws
InterruptedException
,
ExecutionException
;
}
Filter SPI
Filter 是 Dubbo 預置的攔截器擴展 SPI,用來做請求的預處理、結果的後處理,框架本身內置了一些攔截器實現,而從用戶層面,我相信這個 SPI 也應該是被擴展最多的一個。在 3.0 版本中,Filter 回歸單一職責的設計模式,將回調介面單獨提取到 Listener 中。
@SPI
public
interface
Filter
{
Result
invoke(
Invoker
<?> invoker,
Invocation
invocation)
throws
RpcException
;
interface
Listener
{
void
onResponse(
Result
result,
Invoker
<?> invoker,
Invocation
invocation);
void
onError(
Throwable
t,
Invoker
<?> invoker,
Invocation
invocation);
}
}
以上是 Filter 的 SPI 定義,Filter 的核心定義中只有一個 invoke() 方法用來傳遞調用請求。
同時,增加了一個新的回調介面 Listener,每個 Filter 實現可以定義自己的 Listenr 回調器,從而實現對返回結果的非同步監聽,參考以下是為 MonitorFilter 增加的 Listener 回調實現:
class
MonitorListener
implements
Listener
{
@Override
public
void
onResponse(
Result
result,
Invoker
<?> invoker,
Invocation
invocation) {
if
(invoker.getUrl().hasParameter(
Constants
.MONITOR_KEY)) {
collect(invoker, invocation, result,
RpcContext
.getContext().getRemoteHost(),
Long
.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)),
false
);
getConcurrent(invoker, invocation).decrementAndGet();
// count down
}
}
@Override
public
void
onError(
Throwable
t,
Invoker
<?> invoker,
Invocation
invocation) {
if
(invoker.getUrl().hasParameter(
Constants
.MONITOR_KEY)) {
collect(invoker, invocation,
null
,
RpcContext
.getContext().getRemoteHost(),
Long
.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)),
true
);
getConcurrent(invoker, invocation).decrementAndGet();
// count down
}
}
}
泛化調用非同步介面支持
為了更直觀的做非同步調用,泛化介面新增了 CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)介面:
public
interface
GenericService
{
/**
* Generic invocation
*
* @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws GenericException potential exception thrown from the invocation
*/
Object
$invoke(
String
method,
String
[] parameterTypes,
Object
[] args)
throws
GenericException
;
default
CompletableFuture
<
Object
> $invokeAsync(
String
method,
String
[] parameterTypes,
Object
[] args)
throws
GenericException
{
Object
object = $invoke(method, parameterTypes, args);
if
(object
instanceof
CompletableFuture
) {
return
(
CompletableFuture
<
Object
>) object;
}
return
CompletableFuture
.completedFuture(object);
}
}
這樣,當我們想做非同步調用時,就可以直接這樣使用:
CompletableFuture
<
Object
> genericService.$invokeAsync(method, parameterTypes, args);
更具體用例請參見《泛化調用示例》
https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-generic/dubbo-samples-generic-call
非同步與性能
組要注意的是,框架內部的非同步實現本身並不能提高單次調用的性能,相反,由於線程切換和回調邏輯的存在,非同步反而可能會導致單次調用性能的下降,但是非同步帶來的優勢是能減少對資源的佔用,提升整個系統的並發程度和吞吐量,這點對於 RPC 這種需要處理網路延遲的場景非常適用。更多關於非同步化設計的好處,請參考其他非同步化原理介紹相關文章。
響應式編程支持
響應式編程讓開發者更方便地編寫高性能的非同步代碼,很可惜,在之前很長一段時間裡,dubbo 並不支持響應式編程,簡單來說,dubbo 不支持在 rpc 調用時使用 Mono/Flux 這種流對象(reative-stream 里流的概念),給用戶使用帶來了不便。(關於響應式編程更詳細的信息請參見這裡:http://reactivex.io/)。
RSocket 是一個開源的支持 reactive-stream 語義的網路通信協議,他將 reative 語義的複雜邏輯封裝起來了,使得上層可以方便實現網路程序。(RSocket詳細資料請參見這裡:http://rsocket.io/)。
dubbo 在 3.0.0-SNAPSHOT 版本里基於 RSocket 對響應式編程進行了簡單的支持,用戶可以在請求參數和返回值里使用 Mono 和 Flux 類型的對象。下面我們給出使用範例,(範例源碼可以在這裡獲取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)。
首先定義介面如下:
public
interface
DemoService
{
Mono
<
String
> requestMonoWithMonoArg(
Mono
<
String
> m1,
Mono
<
String
> m2);
Flux
<
String
> requestFluxWithFluxArg(
Flux
<
String
> f1,
Flux
<
String
> f2);
}
然後實現該 demo 介面:
public
class
DemoServiceImpl
implements
DemoService
{
@Override
public
Mono
<
String
> requestMonoWithMonoArg(
Mono
<
String
> m1,
Mono
<
String
> m2) {
return
m1.zipWith(m2,
new
BiFunction
<
String
,
String
,
String
>() {
@Override
public
String
apply(
String
s,
String
s2) {
return
s+
" "
+s2;
}
});
}
@Override
public
Flux
<
String
> requestFluxWithFluxArg(
Flux
<
String
> f1,
Flux
<
String
> f2) {
return
f1.zipWith(f2,
new
BiFunction
<
String
,
String
,
String
>() {
@Override
public
String
apply(
String
s,
String
s2) {
return
s+
" "
+s2;
}
});
}
}
然後配置並啟動服務端,注意協議名字填寫 rsocket:
<beans
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo
=
"http://dubbo.apache.org/schema/dubbo"
xmlns
=
"http://www.springframework.org/schema/beans"
xsi:schemaLocation
=
"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"
>
<!-- providers application name, used for tracing dependency relationship -->
<dubbo:application
name
=
"demo-provider"
/>
<!-- use registry center to export service -->
<dubbo:registry
address
=
"zookeeper://127.0.0.1:2181"
/>
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol
name
=
"rsocket"
port
=
"20890"
/>
<!-- service implementation, as same as regular local bean -->
<bean
id
=
"demoService"
class
=
"org.apache.dubbo.samples.basic.impl.DemoServiceImpl"
/>
<!-- declare the service interface to be exported -->
<dubbo:service
interface
=
"org.apache.dubbo.samples.basic.api.DemoService"
ref
=
"demoService"
/>
</beans>
public
class
RsocketProvider
{
public
static
void
main(
String
[] args)
throws
Exception
{
new
EmbeddedZooKeeper
(
2181
,
false
).start();
ClassPathXmlApplicationContext
context =
new
ClassPathXmlApplicationContext
(
new
String
[]{
"spring/rsocket-provider.xml"
});
context.start();
System
.in.read();
// press any key to exit
}
}
然後配置並啟動消費者消費者如下, 注意協議名填寫 rsocket:
<beans
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo
=
"http://dubbo.apache.org/schema/dubbo"
xmlns
=
"http://www.springframework.org/schema/beans"
xsi:schemaLocation
=
"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"
>
<!-- consumers application name, used for tracing dependency relationship (not a matching criterion),
dont set it same as provider -->
<dubbo:application
name
=
"demo-consumer"
/>
<!-- use registry center to discover service -->
<dubbo:registry
address
=
"zookeeper://127.0.0.1:2181"
/>
<!-- generate proxy for the remote service, then demoService can be used in the same way as the
local regular interface -->
<dubbo:reference
id
=
"demoService"
check
=
"true"
interface
=
"org.apache.dubbo.samples.basic.api.DemoService"
/>
</beans>
public
class
RsocketConsumer
{
public
static
void
main(
String
[] args) {
ClassPathXmlApplicationContext
context =
new
ClassPathXmlApplicationContext
(
new
String
[]{
"spring/rsocket-consumer.xml"
});
context.start();
DemoService
demoService = (
DemoService
) context.getBean(
"demoService"
);
// get remote service proxy
while
(
true
) {
try
{
Mono
<
String
> monoResult = demoService.requestMonoWithMonoArg(
Mono
.just(
"A"
),
Mono
.just(
"B"
));
monoResult.doOnNext(
new
Consumer
<
String
>() {
@Override
public
void
accept(
String
s) {
System
.out.println(s);
}
}).block();
Flux
<
String
> fluxResult = demoService.requestFluxWithFluxArg(
Flux
.just(
"A"
,
"B"
,
"C"
),
Flux
.just(
"1"
,
"2"
,
"3"
));
fluxResult.doOnNext(
new
Consumer
<
String
>() {
@Override
public
void
accept(
String
s) {
System
.out.println(s);
}
}).blockLast();
}
catch
(
Throwable
throwable) {
throwable.printStackTrace();
}
}
}
}
可以看到配置上除了協議名使用 rsocket 以外其他並沒有特殊之處。
實現原理
以前用戶並不能在參數或者返回值里使用 Mono/Flux 這種流對象(reative-stream 里的流的概念)。因為流對象自帶非同步屬性,當業務把流對象作為參數或者返回值傳遞給框架之後,框架並不能將流對象正確的進行序列化。
dubbo 基於 RSocket 實現了 reative 支持。RSocket 將 reative 語義的複雜邏輯封裝起來了,給上層提供了簡潔的抽象如下:
/**
* Fire and Forget interaction model of {@code RSocket}.
*
* @param payload Request payload.
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
* handled, otherwise errors.
*/
Mono
<
Void
> fireAndForget(
Payload
payload);
/**
* Request-Response interaction model of {@code RSocket}.
*
* @param payload Request payload.
* @return {@code Publisher} containing at most a single {@code Payload} representing the
* response.
*/
Mono
<
Payload
> requestResponse(
Payload
payload);
/**
* Request-Stream interaction model of {@code RSocket}.
*
* @param payload Request payload.
* @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
*/
Flux
<
Payload
> requestStream(
Payload
payload);
/**
* Request-Channel interaction model of {@code RSocket}.
*
* @param payloads Stream of request payloads.
* @return Stream of response payloads.
*/
Flux
<
Payload
> requestChannel(
Publisher
<
Payload
> payloads);
我們只需要在此基礎上添加我們的 rpc 邏輯即可。
- 從客戶端視角看,框架建立連接之後,只需要將請求信息編碼到 Payload 里,然後通過 requestStream 方法即可向服務端發起請求。
- 從服務端視角看,rsocket 收到請求之後,會調用我們實現的 requestStream 方法,我們從 Payload 里解碼得到請求信息之後,調用業務方法,然後拿到 Flux 類型的返回值即可。
- 需要注意的是業務返回值一般是 Flux,而 RSocket 要求的是 Flux,所以我們需要通過 map operator 攔截業務數據,將 BizDO 編碼為 Payload 才可以遞交給我 RSocket。而 RSocket 會負責數據的傳輸和 reative 語義的實現。
經過上面的分析,我們知道了 Dubbo 如何基於 RSocket 實現了響應式編程的支持。有了響應式編程支持,業務可以更加方便的實現非同步邏輯。
小結
當前 Dubbo 3.0 將提供具備當代特性(如響應性編程)的相關支持,同時汲取阿里內部 HSF 的設計長處來實現兩者的融合,當前預覽版的很多地方還在探討中,希望大家能夠積極反饋,我們都會虛心學習並參考。
人過留名,雁過留聲,路過記得點個贊。
最後送福利了,現在私信我「資料」可以獲取Java工程化、高性能及分散式、高性能、高架構、性能調優、Spring、MyBatis、Netty源碼分析等多個知識點高級進階乾貨的相關視頻資料,還有spring和虛擬機等書籍掃描版,還有更多面試題等你來拿
分享給喜歡Java,喜歡編程,有夢想成為架構師的程序員們,希望能夠幫助到你們。
推薦閱讀: