1. 程式人生 > >提升不止一點點,Dubbo 3.0 預覽版詳細解讀

提升不止一點點,Dubbo 3.0 預覽版詳細解讀

Dubbo 自 2011 年 10 月 27 日開源後,已被許多非阿里系的公司使用,其中既有當當網、網易考拉等網際網路公司,也不乏中國人壽、青島海爾等大型傳統企業。更多使用者資訊,可以訪問Dubbo @GitHub,issue#1012: Wanted: who's using dubbo。

自去年 12 月開始,Dubbo 3.0 便已正式進入開發階段,並備受社群和廣大 Dubbo 使用者的關注,本文將為您詳細解讀 3.0 預覽版的新特性和新功能。

下面先解答一下兩個有意思的與 Dubbo 相關的疑問。

  • 為什麼 Dubbo 一開源就是 2.0 版本?之前是否存在 1.0 版本?

筆者曾做過 Dubbo 協議的適配相容,Dubbo 確實存在過 1.x 版本,而且從協議設計和模型設計上都與 2.0 的開源版本協議是完全不一樣的。下圖是關於 Dubbo 的發展路徑:

  • 阿里內部正在使用 Dubbo 開源版本嗎?

是的,非常確定,當前開源版本的 Dubbo 在阿里巴巴被廣泛使用,而阿里的電商核心部門是用的 HSF2.2 版本,這個版本是相容了 Dubbo 使用方式和 Remoting 協議。當然,我們現在正在做 HSF2.2 的升級,直接依賴開源版本的 Dubbo 來做核心的統一。所以,Dubbo 是得到大規模線上系統驗證的分散式服務框架,這一點毋容置疑。

Dubbo 3.0 預覽版的要點

Dubbo 3.0 在設計和功能上的新增支援和改進,主要是以下四方面:

  • Dubbo 核心之 Filter 鏈的非同步化

這裡要指出的是,3.0 中規劃的非同步去阻塞和 2.7 中提供的非同步是兩個層面的特性。2.7 中的非同步是建立在傳統 RPC 中 request – response 會話模型上的,而 3.0 中的非同步將會從通訊協議層面由下向上構建,關注的是跨程序、全鏈路的非同步問題。通過底層協議開始支援 streaming 方式,不單單可以支援多種會話模型,還可以在協議層面開始支援反壓、限流等特性,使得整個分散式體系更具有彈性。綜上所述,2.7 關注的非同步更侷限在點對點的非同步(一個 consumer 呼叫一個 provider),3.0 關注的非同步化,寬度上則關注整個呼叫鏈上的非同步,高度上則向上又可以包裝成 Rx 的程式設計模型。有趣的是,Spring 5.0 釋出了對 Flux 的支援,隨後開始解決跨程序的非同步問題。

  • 功能方面是 reactive(響應式)支援

最近幾年, reactive programming這個詞語的熱度迅速提升,Wikipedia 上的 reactive programming 解釋是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0會實現Reactive Stream 的 rx 介面,從而能讓使用者享受到RP帶來的響應性提升,甚至面向 RP 的架構升級。當然,我們希望 reactive 不單單能夠帶來事件(event)驅動的應用整合方式的升級,也希望在 Load Balance(選擇最優的服務節點),fault tolerance(限流降級時最好做到自適應)等方面發揮其積極價值。

  • 雲原生/ ServiceMesh 方向的探索

我們定下的策略是進入 Envoy 社群來實現 Dubbo 融入 mesh 的理念思想,目前 Dubbo 協議已經被 Envoy 支援。當然,Dubbo Mesh 離真正可用還有很長一段距離,其在選址、負載均衡和服務治理方面的工作需要繼續在資料面建設,另外,控制面板的建設在社群也沒有提上日程。

  • 融合並支援阿里內部

Dubbo 3.0 定下了內外融合的策略,也就是說 3.0 的核心最終會在阿里巴巴的生產系統中部署,相信通過大流量、大規模的考驗,Dubbo 使用者可以獲得一個性能、穩定、服務治理實踐各方面俱佳的核心,使用者在生產系統中採用 3.0 也會更加放心。這一點也是 Dubbo 3.0 最重要的使命。

Filter 鏈的非同步化設計

Dubbo 最強大的一處設計是其在 Filter 鏈上的抽象設計,通過其擴充套件機制的開放性支援,使用者可以對 Dubbo 做功能增強,並允許各個擴充套件點被定製來是否保留。

Dubbo 的 Filter 定義如下:

@SPI
public interface Filter {

    /**
     * do invoke filter.
     * <p>
     * <code>
     * // before filter
     * Result result = invoker.invoke(invocation);
     * // after filter
     * return result;
     * </code>
     *
     * @param invoker    service
     * @param invocation invocation.
     * @return invoke result.
     * @throws RpcException
     * @see org.apache.dubbo.rpc.Invoker#invoke(Invocation)
     */
    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;

}

按照“呼叫一個遠端服務的方法就像呼叫本地的方法一樣”這種說法,這個直接返回 Result 響應的方式是非常好的,用起來是簡單直接,問題是時代變換到了需要關注體驗,需要走 Reactive 響應式的時代,也回到基本點:invoke一個 invocation 需要經過網路在不同的程序處理,天然就是非同步的過程,也就是傳送請求(invocation)與接收響應(Result)本身是兩個不同的事件,是需要兩個過程方法來在 Filter 鏈處理。那麼如何改造這個關鍵的 SPI 呢?有兩種方案:

第一種,把 invoke 的返回值改成 CompletableFuture, 好處是一目瞭然,Result 不在建議同步獲取了;但基礎介面的簽名一改會導致程式碼改造量巨大,同時也會讓原有的 SPI 擴充套件不在支援。

第二種,Result 介面直接繼承 CompletationStage,是代表了響應的非同步計算。這樣能進避免第一種的劣勢。所以,3.0.0 Preview 版本對內部呼叫鏈路實現做了一次重構:基於 CompletableFuture 實現了框架內部的全非同步呼叫,而在外圍程式設計上,同時支援同步、非同步呼叫模式。

值得注意的是,此次重構僅限於框架內部實現,對使用方沒有任何影響即介面上保持完全相容。要了解 Dubbo 非同步 API 如何使用,請參考《如何基於 Dubbo 實現全非同步的呼叫鏈》(地址:http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html,這篇文章將著重對實現思路和原理做一些簡單介紹。此次重構的要點有:

  • 框架內部採用全非同步呼叫模型,僅在外圍做同步、非同步適配;
  • 內建Filter鏈支援非同步回撥;

基本工作流程

首先我們來看一個通用的跨網路非同步呼叫的執行緒模型:

通訊框架非同步傳送請求訊息,請求訊息傳送成功後,返回代表業務結果的 CompletableFuture 給業務執行緒。之後對於 Future 的處理,根據呼叫型別會有所區別:

  1. 對於同步請求(如上圖體現的場景),業務執行緒會呼叫 future.get 同步阻塞等待結果,當收到網路層返回的業務結果後,future.get 返回並最終將結果傳遞給呼叫發起方。
  2. 對於非同步請求,業務執行緒不會呼叫 future.get,而是將 future 儲存在呼叫上下文或者直接返回給呼叫者,同時會為 future 註冊回撥監聽器,以便當真正的業務結果從通訊層返回時監聽器可以對結果做進一步的處理。

接下來具體看一下一次非同步 Dubbo RPC 請求的呼叫流程:

  1. 消費方面向 Proxy 代理程式設計,發出呼叫請求,請求經過 Filter 鏈向下傳遞。
  2. Invoker.invoke() 將請求非同步轉發給網路層,並收到代表返回結果的 Future。
  3. Future 被包裝到 Result,轉而由 Result 代表這次遠端呼叫的結果(由於 Result 的非同步屬性,此時它可能並不包含真正的返回值)。
  4. Result 繼續沿著呼叫鏈返回,在經過每個 Filter 時,Filter 可選擇註冊 Listener 監聽器,以便在業務結果返回時執行結果預處理。
  5. 最終 Proxy 呼叫 result.recreate() 將結果返回給消費者:
  • 如果方法是 CompletableFuture 簽名,則返回 Future;
  • 如果方法是普通同步簽名,則返回物件預設值,Future 可通過 RpcContext 拿到;

6. 呼叫方在拿到代表非同步業務結果的 Future 後,可選擇註冊回撥監聽器,以監聽真正的業務結果返回。

同步呼叫和非同步呼叫基本上是一致的,並且也是走的回撥模式,只是在鏈路返回之前做了一次阻塞 get 呼叫,以確保在收到實際結果時再返回。Filter 在註冊 Listener 時由於 Future 已處於 complete 狀態,因此會同時觸發回撥 onResponse()/onError()。

關於流程圖中提到的 Result,Result 在 Dubbo 的一次 RPC 呼叫中代表返回結果,在 3.0 中 Result 自身增加了代表狀態的介面,類似 Future 現在 Result 可以代表一次未完成的呼叫。

要讓 Result 具備代表非同步返回結果的能力,有兩中方式來實現:

1. Result is a Future,在 Java 8 中更合理的方式是繼承 CompletionStage 介面。

public interface Result extends CompletionStage {


  }

2. 讓 Result 例項持有 Future 例項,與 1 的區別即是設計中選用“繼承”還是“組合”。

public class AsyncRpcResult implements Result {
     private CompletableFuture<RpcResult> resultFuture;
  }

同時,為了讓 Result 更直觀的體現其非同步結果的特性,也為了方便麵向 Result 介面程式設計,我們可以考慮為Result增加一些非同步介面:

public interface Result extends Serializable {

    Result thenApplyWithContext(Function<Result, Result> fn);

    <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);

    Result get() throws InterruptedException, ExecutionException;

}

Filter SPI

Filter 是 Dubbo 預置的攔截器擴充套件 SPI,用來做請求的預處理、結果的後處理,框架本身內建了一些攔截器實現,而從使用者層面,我相信這個 SPI 也應該是被擴充套件最多的一個。在 3.0 版本中,Filter 迴歸單一職責的設計模式,將回調介面單獨提取到 Listener 中。

@SPI

public interface Filter {

    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;

    interface Listener {

        void onResponse(Result result, Invoker<?> invoker, Invocation invocation);

        void onError(Throwable t, Invoker<?> invoker, Invocation invocation);

    }

}

以上是 Filter 的 SPI 定義,Filter 的核心定義中只有一個 invoke() 方法用來傳遞呼叫請求。

同時,增加了一個新的回撥介面 Listener,每個 Filter 實現可以定義自己的 Listenr 回撥器,從而實現對返回結果的非同步監聽,參考以下是為 MonitorFilter 增加的 Listener 回撥實現:

class MonitorListener implements Listener {
        @Override
        public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
                collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
                collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
}

泛化呼叫非同步介面支援

為了更直觀的做非同步呼叫,泛化介面新增了 CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)介面:

public interface GenericService {
    /**
     * Generic invocation
     *
     * @param method         Method name, e.g. findPerson. If there are overridden methods, parameter info is
     *                       required, e.g. findPerson(java.lang.String)
     * @param parameterTypes Parameter types
     * @param args           Arguments
     * @return invocation return value
     * @throws GenericException potential exception thrown from the invocation
     */
    Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
    default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {
        Object object = $invoke(method, parameterTypes, args);
        if (object instanceof CompletableFuture) {
            return (CompletableFuture<Object>) object;
        }
        return CompletableFuture.completedFuture(object);
    }
}

這樣,當我們想做非同步呼叫時,就可以直接這樣使用:

CompletableFuture<Object> genericService.$invokeAsync(method, parameterTypes, args);

更具體用例請參見《泛化呼叫示例》

https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-generic/dubbo-samples-generic-call

非同步與效能

組要注意的是,框架內部的非同步實現本身並不能提高單次呼叫的效能,相反,由於執行緒切換和回撥邏輯的存在,非同步反而可能會導致單次呼叫效能的下降,但是非同步帶來的優勢是能減少對資源的佔用,提升整個系統的併發程度和吞吐量,這點對於 RPC 這種需要處理網路延遲的場景非常適用。更多關於非同步化設計的好處,請參考其他非同步化原理介紹相關文章。

響應式程式設計支援

響應式程式設計讓開發者更方便地編寫高效能的非同步程式碼,很可惜,在之前很長一段時間裡,dubbo 並不支援響應式程式設計,簡單來說,dubbo 不支援在 rpc 呼叫時使用 Mono/Flux 這種流物件(reative-stream 裡流的概念),給使用者使用帶來了不便。(關於響應式程式設計更詳細的資訊請參見這裡:http://reactivex.io/)

RSocket 是一個開源的支援 reactive-stream 語義的網路通訊協議,他將 reative 語義的複雜邏輯封裝起來了,使得上層可以方便實現網路程式。(RSocket詳細資料請參見這裡:http://rsocket.io/)

dubbo 在 3.0.0-SNAPSHOT 版本里基於 RSocket 對響應式程式設計進行了簡單的支援,使用者可以在請求引數和返回值裡使用 Mono 和 Flux 型別的物件。下面我們給出使用範例,(範例原始碼可以在這裡獲取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)

首先定義介面如下:

public interface DemoService {

    Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2);

    Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2);

}

然後實現該 demo 介面:

public class DemoServiceImpl implements DemoService {
    @Override
    public Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) {
        return m1.zipWith(m2, new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) {
                return s+" "+s2;
            }
        });
    }
    @Override
    public Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) {
        return f1.zipWith(f2, new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) {
                return s+" "+s2;
            }
        });
    }
}

然後配置並啟動服務端,注意協議名字填寫 rsocket:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <!-- provider's application name, used for tracing dependency relationship -->
    <dubbo:application name="demo-provider"/>
    <!-- use registry center to export service -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
    <!-- use dubbo protocol to export service on port 20880 -->
    <dubbo:protocol name="rsocket" port="20890"/>
    <!-- service implementation, as same as regular local bean -->
    <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/>
    <!-- declare the service interface to be exported -->
    <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/>
</beans>
public class RsocketProvider {
    public static void main(String[] args) throws Exception {
        new EmbeddedZooKeeper(2181, false).start();
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"});
        context.start();
        System.in.read(); // press any key to exit
    }
}

然後配置並啟動消費者消費者如下, 注意協議名填寫 rsocket:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),
    don't set it same as provider -->
    <dubbo:application name="demo-consumer"/>
    <!-- use registry center to discover service -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
    <!-- generate proxy for the remote service, then demoService can be used in the same way as the
    local regular interface -->
    <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/>
</beans>
public class RsocketConsumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"});
        context.start();
        DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
        while (true) {
            try {
                Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B"));
                monoResult.doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                }).block();
                Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3"));
                fluxResult.doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                }).blockLast();
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        }
    }
}

可以看到配置上除了協議名使用 rsocket 以外其他並沒有特殊之處。

實現原理

以前使用者並不能在引數或者返回值裡使用 Mono/Flux 這種流物件(reative-stream 裡的流的概念)。因為流物件自帶非同步屬性,當業務把流物件作為引數或者返回值傳遞給框架之後,框架並不能將流物件正確的進行序列化。

dubbo 基於 RSocket 實現了 reative 支援。RSocket 將 reative 語義的複雜邏輯封裝起來了,給上層提供了簡潔的抽象如下:

/**
   * Fire and Forget interaction model of {@code RSocket}.
   *
   * @param payload Request payload.
   * @return {@code Publisher} that completes when the passed {@code payload} is successfully
   *     handled, otherwise errors.
   */
  Mono<Void> fireAndForget(Payload payload);
  /**
   * Request-Response interaction model of {@code RSocket}.
   *
   * @param payload Request payload.
   * @return {@code Publisher} containing at most a single {@code Payload} representing the
   *     response.
   */
  Mono<Payload> requestResponse(Payload payload);
  /**
   * Request-Stream interaction model of {@code RSocket}.
   *
   * @param payload Request payload.
   * @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
   */
  Flux<Payload> requestStream(Payload payload);
  /**
   * Request-Channel interaction model of {@code RSocket}.
   *
   * @param payloads Stream of request payloads.
   * @return Stream of response payloads.
   */
  Flux<Payload> requestChannel(Publisher<Payload> payloads);

我們只需要在此基礎上新增我們的 rpc 邏輯即可。

  • 從客戶端視角看,框架建立連線之後,只需要將請求資訊編碼到 Payload 裡,然後通過 requestStream 方法即可向服務端發起請求。
  • 從服務端視角看,rsocket 收到請求之後,會呼叫我們實現的 requestStream 方法,我們從 Payload 裡解碼得到請求資訊之後,呼叫業務方法,然後拿到 Flux 型別的返回值即可。
  • 需要注意的是業務返回值一般是 Flux,而 RSocket 要求的是 Flux,所以我們需要通過 map operator 攔截業務資料,將 BizDO 編碼為 Payload 才可以遞交給我 RSocket。而 RSocket 會負責資料的傳輸和 reative 語義的實現。

經過上面的分析,我們知道了 Dubbo 如何基於 RSocket 實現了響應式程式設計的支援。有了響應式程式設計支援,業務可以更加方便的實現非同步邏輯。

小結

當前 Dubbo 3.0 將提供具備當代特性(如響應性程式設計)的相關支援,同時汲取阿里內部 HSF 的設計長處來實現兩者的融合,當前預覽版的很多地方還在探討中,希望大家能夠積極反饋,我們都會虛心學習並參考。

Dubbo 3.0 sample @GitHub:

https://github.com/apache/incubator-dubbo-samples/tree/3.x

本文作者:

覃柳傑(花名:未宇)

Github ID: qinliujie,阿里巴巴中介軟體開發,Dubbo 開源專案 PMC,參與 HSF2.2和 Dubbo3.0 的設計和開發。

呂仁琦(花名:空冥)

Github ID: jefflv,阿里巴巴中介軟體開發,Dubbo 開源專案 commiter,參與了內部 HSF2.0 的設計和開發。

劉軍(花名:陸龜)

阿里巴巴中介軟體高階開發工程師,Apache Dubbo (Incubating)PPMC,深度參與 Dubbo 專案開發,主要貢獻者之一。

謝育能(花名:思邪)

阿里巴巴中介軟體開發,Dubbo 3.0 開源專案的響應式模組的負責人,參與了內部 HSF2.2 的設計和開發。

作者:中介軟體小哥

原文連結

本文為雲棲社群原創內容,未經