1. 程式人生 > >Reactive程式設計(三):一個簡單的HTTP服務

Reactive程式設計(三):一個簡單的HTTP服務

書接上文 Reactive程式設計 ,我們已經瞭解了基礎的API,現在我們開始編寫實際的應用。Reactive對併發程式設計進行了很好的抽象,也有很多底層的特性需要我們去關注。當使用這些特性時,我們可以對之前隱藏在容器、平臺、框架中的細節進行控制。

Spring MVC由阻塞轉向Reactive

Reactive要求我們以不同的思路來看待問題。區別於傳統的request->response模式,所有的資料都是釋出為一個序列 (Publisher) 然後進行訂閱(Subscriber)。區別於同步等待返回結果,改為註冊一個回撥。只要我們習慣了這種方式,就不會覺得很複雜。但是沒辦法讓整個環境都同時變為Reactive模式,所以避免不了要跟老式的阻塞API打交道。

假設我們有一個返回HttpStatus的阻塞方法:

private RestTemplate restTemplate = new RestTemplate();

private HttpStatus block(int value) {
    return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
            .getStatusCode();
}

我們需要傳遞不同的引數來重複呼叫這個方法,並對返回的結果進行處理。這是一個典型的 “scatter-gather”應用場景。例如從多個頁面中提取前N條資料。

這是一個採用錯誤方式 的例子:

Flux.range(1, 10) (1)
    .log()
    .map(this::block) (2)
    .collect(Result::new, Result::add) (3)
    .doOnSuccess(Result::stop) (4)
  1. 呼叫 10次介面
  2. 產生阻塞
  3. 將結果進行彙總後放入一個物件
  4. 最後結束處理 (結果是一個 Mono<Result>)

不要採用這種方式來編寫程式碼。這是一種錯誤的實現方式,這樣會阻塞住呼叫執行緒,這跟迴圈呼叫block()沒什麼區別。好的實現應該是把block()的呼叫放到工作執行緒中。我們可以採用一個返回Mono<HttpStatus>

的方法:

private Mono<HttpStatus> fetch(int value) {
    return Mono.fromCallable(() -> block(value)) (1)
        .subscribeOn(this.scheduler);            (2)
}
  1. 將阻塞呼叫放到一個 Callable
  2. 在工作執行緒中進行訂閱

scheduler 單獨定義為一個共享變數:


  Scheduler scheduler = Schedulers.parallel()

然後用 flatMap() 代替 map()


Flux.range(1, 10)
    .log()
    .flatMap(                             (1)
        this::fetch, 4)                   (2)
    .collect(Result::new, Result::add)
    .doOnSuccess(Result::stop)
  1. 在新的publisher中並行處理
  2. flatMap的並行引數

嵌入 Non-Reactive 服務

如果想將上面的程式碼放入一個servlet這種的Non-Reactive的服務中,可以使用Spring MVC:


@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
    return Flux.range(1, 10)
      ...
      .doOnSuccess(Result::stop)
      .toFuture();
}

在閱讀了 @RequestMapping 的javadoc以後,我們會發現這個方法會返回一個 CompletableFuture ,應用會選擇在單獨的執行緒中返回值。在我們的例子中這個單獨的執行緒由 scheduler 提供。

沒有免費的午餐

利用工作執行緒進行 scatter-gather 計算是一個好的模式,但是也不完美 - 沒有阻塞呼叫方,但還是阻塞了一些東西,只不過是把問題轉移了。我們有一個非阻塞IO的 HTTP 服務,將處理放入執行緒池,一個請求一個執行緒 - 這是servlet容器的機制(例如tomcat)。請求是非同步處理的,因此tomcat內部的工作執行緒沒有被阻塞,我們的 scheduler 會建立4個執行緒。在處理10個請求時,理論上處理效能會提高4倍。簡單來說,如果我們在單個執行緒中順序處理10個請求要花1000ms,我們所採用的方式只需250ms。

我們可以通過增加執行緒來進一步提升效能(分配16個執行緒):


private Scheduler scheduler = Schedulers.newParallel("sub", 16);

Tomcat 預設會分配100個執行緒來處理請求,當所有的請求同時處理時,我們的 scheduler 執行緒池會成為一個瓶頸。我們的 scheduler 執行緒池數量遠小於 Tomcat 的執行緒池數量。這說明效能調優不是一個簡單的事情,需要考慮各個引數和資源的匹配情況。

相比固定數量的執行緒池,我們可以採用更靈活的執行緒池,可以根據需要動態調整執行緒數量。Reactor 已經提供了這種機制,使用 Schedulers.elastic() 後可以看到當請求增多時,執行緒數量會隨之增加。

全面採用 Reactive

從阻塞呼叫到reactive的橋接是一種有效的模式,並且用Spring MVC的技術很容易實現。接下來我們將完全棄用阻塞模式,採用新的API和新的工具。最終我們實現全棧Reactive。

在我們的例子中,第一步先用 spring-boot-starter-web-reactive 替換 spring-boot-starter-web :

Maven:


<dependencies>
  <dependency>
   <groupId>org.springframework.boot.experimental</groupId>
     <artifactId>spring-boot-starter-web-reactive</artifactId>
  </dependency>
  ...
</dependencies>
    <dependencyManagement>
     <dependencies>
       <dependency>
         <groupId>org.springframework.boot.experimental</groupId>
         <artifactId>spring-boot-dependencies-web-reactive</artifactId>
         <version>0.1.0.M1</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
     </dependencies>
    </dependencyManagement>

Gradle:


dependencies {
    compile('org.springframework.boot.experimental:spring-boot-starter-web-reactive')
    ...
}
dependencyManagement {
    imports {
        mavenBom "org.springframework.boot.experimental:spring-boot-dependencies-web-reactive:0.1.0.M1"
    }
}

在controller中,不再使用 CompletableFuture 代之以返回一個 Mono :


@RequestMapping("/parallel")
public Mono<Result> parallel() {
    return Flux.range(1, 10)
            .log()
            .flatMap(this::fetch, 4)
            .collect(Result::new, Result::add)
            .doOnSuccess(Result::stop);
}

將這段程式碼放到SpringBoot應用中,可以執行在 Tomcat, Jetty 或者 Netty, 取決於classpath引入了哪個包。Tomcat 是預設的容器,如果想用別的容器,需要把 Tomcat 從 classpath 中去掉,然後引入其它的容器。這3個容器在啟動時間、記憶體使用和執行時資源上相差不大。

我們仍然呼叫 block() 阻塞服務介面,所以我們仍需在工作執行緒中訂閱以免阻塞呼叫方。我們也可以採用一個非阻塞的客戶端,例如用新的 WebClient 替換 RestTemplate :


private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());

private Mono<HttpStatus> fetch(int value) {
    return this.client.perform(HttpRequestBuilders.get("http://example.com"))
            .extract(WebResponseExtractors.response(String.class))
            .map(response -> response.getStatusCode());
}

注意 WebClient.perform() 的返回值是一個轉換為 Mono<HttpStatus> 的Reactive型別,但是我們沒有訂閱它。訂閱的工作由框架來完成。

控制反轉

現在我們去掉 fetch() 呼叫之後的併發引數:


@RequestMapping("/netty")
public Mono<Result> netty() {
    return Flux.range(1, 10) (1)
        .log() //
        .flatMap(this::fetch) (2)
        .collect(Result::new, Result::add)
        .doOnSuccess(Result::stop);
}
  1. 進行10次呼叫
  2. 在新的publisher中並行處理

由於不再使用額外的訂閱執行緒,相比於阻塞與Reactive橋接模式中的程式碼簡潔了很多,現在是全面Reactive模式了。WebClient 返回一個 Mono ,在然而然的我們需要在轉換鏈中使用 flatMap() 。編寫這種程式碼是一個很好的體驗,易於理解便於維護。同時不再需要執行緒池和併發引數,也沒有了影響效能的魔法數字4 。效能取決於系統資源而不是應用的 執行緒控制。

應用可以執行在 Tomcat, Jetty 或 Netty 上. Tomcat 和 Jetty 的支援基於 Servlet 3.1 的非同步處理,受限於一個請求一個執行緒。而執行在 Netty 上則沒有這個限制。只要客戶端不阻塞,會 儘快的分發客戶端請求。由於Netty服務不是一個請求一個執行緒,因此不會使用大量的執行緒。

注意,很多應用的阻塞呼叫不只是HTTP,還有資料庫操作。當前很少的資料庫支援非阻塞的客戶端(除了 MongoDB 和 Couchbase)。執行緒池和 blocking-to-reactive 模式會長期存在。

仍然沒有免費的午餐

首先,我們的程式碼是宣告式的,不方便除錯,錯誤發生時不容易定位。使用原生的API,例如不通過Spring框架而直接使用Reactor,會使情況變的更糟,因為我們自己要做很多的錯誤處理,每次進行網路呼叫都要寫很多樣板程式碼。通過組合使用 Spring 和 Reactor 我們可以方便的檢視堆疊資訊和未捕獲的異常。由於執行的執行緒不受我們控制,因此在理解上會有困難。

其次,一旦編寫錯誤導致一個Reactive回撥被阻塞,在同一執行緒上的所有請求都會掛起。在servlet容器中,由於是一個請求一個執行緒,一個請求阻塞時,其它的請求不會受影響。而在Reactive中,一個請求被阻塞會導致所有請求的延遲都增加。

總結

在非同步處理中能夠控制所有的環節是非常好的:每一個層級都有執行緒池和佇列。我們可以使一些層級具有彈效能力,可以根據負載動態調整。但是這也是一種負擔,我們期望有更加簡潔的方式。可擴充套件性的分析結果趨向於減少多餘的執行緒,不要超出硬體資源的限制條件。

Reactive 不能解決所有問題的方案,事實上它本身不是一個方案,它只是促進了某一類問題的解決方案的產生。學習的成本、程式的調整、後續的維護成本可能遠大於其所帶來的益處。所以在是否使用 Reactive 這個問題上要非常謹慎。