Reactive程式設計(三):一個簡單的HTTP服務
書接上文 Reactive程式設計 ,我們已經瞭解了基礎的API,現在我們開始編寫實際的應用。Reactive對併發程式設計進行了很好的抽象,也有很多底層的特性需要我們去關注。當使用這些特性時,我們可以對之前隱藏在容器、平臺、框架中的細節進行控制。
Spring MVC由阻塞轉向Reactive
Reactive要求我們以不同的思路來看待問題。區別於傳統的request->response模式,所有的資料都是釋出為一個序列 (Publisher) 然後進行訂閱(Subscriber)。區別於同步等待返回結果,改為註冊一個回撥。只要我們習慣了這種方式,就不會覺得很複雜。但是沒辦法讓整個環境都同時變為Reactive模式,所以避免不了要跟老式的阻塞API打交道。
假設我們有一個返回HttpStatus的阻塞方法:
private RestTemplate restTemplate = new RestTemplate();
private HttpStatus block(int value) {
return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
.getStatusCode();
}
我們需要傳遞不同的引數來重複呼叫這個方法,並對返回的結果進行處理。這是一個典型的 “scatter-gather”應用場景。例如從多個頁面中提取前N條資料。
這是一個採用錯誤方式 的例子:
Flux.range(1, 10) (1)
.log()
.map(this::block) (2)
.collect(Result::new, Result::add) (3)
.doOnSuccess(Result::stop) (4)
- 呼叫 10次介面
- 產生阻塞
- 將結果進行彙總後放入一個物件
- 最後結束處理 (結果是一個
Mono<Result>
)
不要採用這種方式來編寫程式碼。這是一種錯誤的實現方式,這樣會阻塞住呼叫執行緒,這跟迴圈呼叫block()沒什麼區別。好的實現應該是把block()
的呼叫放到工作執行緒中。我們可以採用一個返回Mono<HttpStatus>
private Mono<HttpStatus> fetch(int value) {
return Mono.fromCallable(() -> block(value)) (1)
.subscribeOn(this.scheduler); (2)
}
- 將阻塞呼叫放到一個
Callable
中 - 在工作執行緒中進行訂閱
scheduler
單獨定義為一個共享變數:
Scheduler scheduler = Schedulers.parallel()
然後用 flatMap()
代替 map()
Flux.range(1, 10)
.log()
.flatMap( (1)
this::fetch, 4) (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop)
- 在新的publisher中並行處理
- flatMap的並行引數
嵌入 Non-Reactive 服務
如果想將上面的程式碼放入一個servlet這種的Non-Reactive的服務中,可以使用Spring MVC:
@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
return Flux.range(1, 10)
...
.doOnSuccess(Result::stop)
.toFuture();
}
在閱讀了 @RequestMapping
的javadoc以後,我們會發現這個方法會返回一個 CompletableFuture
,應用會選擇在單獨的執行緒中返回值。在我們的例子中這個單獨的執行緒由 scheduler
提供。
沒有免費的午餐
利用工作執行緒進行 scatter-gather 計算是一個好的模式,但是也不完美 - 沒有阻塞呼叫方,但還是阻塞了一些東西,只不過是把問題轉移了。我們有一個非阻塞IO的 HTTP 服務,將處理放入執行緒池,一個請求一個執行緒 - 這是servlet容器的機制(例如tomcat)。請求是非同步處理的,因此tomcat內部的工作執行緒沒有被阻塞,我們的 scheduler
會建立4個執行緒。在處理10個請求時,理論上處理效能會提高4倍。簡單來說,如果我們在單個執行緒中順序處理10個請求要花1000ms,我們所採用的方式只需250ms。
我們可以通過增加執行緒來進一步提升效能(分配16個執行緒):
private Scheduler scheduler = Schedulers.newParallel("sub", 16);
Tomcat 預設會分配100個執行緒來處理請求,當所有的請求同時處理時,我們的 scheduler 執行緒池會成為一個瓶頸。我們的 scheduler 執行緒池數量遠小於 Tomcat 的執行緒池數量。這說明效能調優不是一個簡單的事情,需要考慮各個引數和資源的匹配情況。
相比固定數量的執行緒池,我們可以採用更靈活的執行緒池,可以根據需要動態調整執行緒數量。Reactor 已經提供了這種機制,使用 Schedulers.elastic()
後可以看到當請求增多時,執行緒數量會隨之增加。
全面採用 Reactive
從阻塞呼叫到reactive的橋接是一種有效的模式,並且用Spring MVC的技術很容易實現。接下來我們將完全棄用阻塞模式,採用新的API和新的工具。最終我們實現全棧Reactive。
在我們的例子中,第一步先用 spring-boot-starter-web-reactive
替換 spring-boot-starter-web
:
Maven:
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>
...
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-dependencies-web-reactive</artifactId>
<version>0.1.0.M1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Gradle:
dependencies {
compile('org.springframework.boot.experimental:spring-boot-starter-web-reactive')
...
}
dependencyManagement {
imports {
mavenBom "org.springframework.boot.experimental:spring-boot-dependencies-web-reactive:0.1.0.M1"
}
}
在controller中,不再使用 CompletableFuture
代之以返回一個 Mono
:
@RequestMapping("/parallel")
public Mono<Result> parallel() {
return Flux.range(1, 10)
.log()
.flatMap(this::fetch, 4)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
將這段程式碼放到SpringBoot應用中,可以執行在 Tomcat, Jetty 或者 Netty, 取決於classpath引入了哪個包。Tomcat 是預設的容器,如果想用別的容器,需要把 Tomcat 從 classpath 中去掉,然後引入其它的容器。這3個容器在啟動時間、記憶體使用和執行時資源上相差不大。
我們仍然呼叫 block()
阻塞服務介面,所以我們仍需在工作執行緒中訂閱以免阻塞呼叫方。我們也可以採用一個非阻塞的客戶端,例如用新的 WebClient
替換 RestTemplate
:
private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
private Mono<HttpStatus> fetch(int value) {
return this.client.perform(HttpRequestBuilders.get("http://example.com"))
.extract(WebResponseExtractors.response(String.class))
.map(response -> response.getStatusCode());
}
注意 WebClient.perform()
的返回值是一個轉換為 Mono<HttpStatus>
的Reactive型別,但是我們沒有訂閱它。訂閱的工作由框架來完成。
控制反轉
現在我們去掉 fetch()
呼叫之後的併發引數:
@RequestMapping("/netty")
public Mono<Result> netty() {
return Flux.range(1, 10) (1)
.log() //
.flatMap(this::fetch) (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
- 進行10次呼叫
- 在新的publisher中並行處理
由於不再使用額外的訂閱執行緒,相比於阻塞與Reactive橋接模式中的程式碼簡潔了很多,現在是全面Reactive模式了。WebClient
返回一個 Mono
,在然而然的我們需要在轉換鏈中使用 flatMap()
。編寫這種程式碼是一個很好的體驗,易於理解便於維護。同時不再需要執行緒池和併發引數,也沒有了影響效能的魔法數字4 。效能取決於系統資源而不是應用的 執行緒控制。
應用可以執行在 Tomcat, Jetty 或 Netty 上. Tomcat 和 Jetty 的支援基於 Servlet 3.1 的非同步處理,受限於一個請求一個執行緒。而執行在 Netty 上則沒有這個限制。只要客戶端不阻塞,會 儘快的分發客戶端請求。由於Netty服務不是一個請求一個執行緒,因此不會使用大量的執行緒。
注意,很多應用的阻塞呼叫不只是HTTP,還有資料庫操作。當前很少的資料庫支援非阻塞的客戶端(除了 MongoDB 和 Couchbase)。執行緒池和 blocking-to-reactive 模式會長期存在。
仍然沒有免費的午餐
首先,我們的程式碼是宣告式的,不方便除錯,錯誤發生時不容易定位。使用原生的API,例如不通過Spring框架而直接使用Reactor,會使情況變的更糟,因為我們自己要做很多的錯誤處理,每次進行網路呼叫都要寫很多樣板程式碼。通過組合使用 Spring 和 Reactor 我們可以方便的檢視堆疊資訊和未捕獲的異常。由於執行的執行緒不受我們控制,因此在理解上會有困難。
其次,一旦編寫錯誤導致一個Reactive回撥被阻塞,在同一執行緒上的所有請求都會掛起。在servlet容器中,由於是一個請求一個執行緒,一個請求阻塞時,其它的請求不會受影響。而在Reactive中,一個請求被阻塞會導致所有請求的延遲都增加。
總結
在非同步處理中能夠控制所有的環節是非常好的:每一個層級都有執行緒池和佇列。我們可以使一些層級具有彈效能力,可以根據負載動態調整。但是這也是一種負擔,我們期望有更加簡潔的方式。可擴充套件性的分析結果趨向於減少多餘的執行緒,不要超出硬體資源的限制條件。
Reactive 不能解決所有問題的方案,事實上它本身不是一個方案,它只是促進了某一類問題的解決方案的產生。學習的成本、程式的調整、後續的維護成本可能遠大於其所帶來的益處。所以在是否使用 Reactive 這個問題上要非常謹慎。