響應式程式設計系列(一):什麼是響應式程式設計?reactor入門
響應式程式設計 系列文章目錄
(二)Flux入門學習:流的概念,特性和基本操作
(三)Flux深入學習:流的高階特性和進階用法
(四)reactor-core響應式api如何測試和除錯?
(五)Spring reactive: Spring WebFlux的使用
(六)Spring reactive: webClient的使用
引言
Spring framework 5 的一大新特性:響應式程式設計(Reactive Programming)。那麼什麼是響應式?他能給我們帶來什麼?如何優雅地使用?本系列會從最基礎的概念和簡單的api講起,再慢慢深入探討響應式的一些高階特性,最後講解實戰內容,例如WebFlux和WebClient等在Spring boot中的使用,如何測試和除錯。
想要了解原理的話,美團點評的這篇部落格 Java NIO淺析 非常適合入門。
簡單地說:
當我們呼叫socket.read()、socket.write()這類阻塞函式的時候,這類函式不能立即返回,也無法中斷,需要等待socket可讀或者可寫,才會返回,因此一個執行緒只能處理一個請求。在這等待的過程中,cpu並不幹活,(即阻塞住了),那麼cpu的資源就沒有很好地利用起來。因此對於這種情況,我們使用多執行緒來提高cpu資源的利用率:在等待的這段時間,就可以切換到別的執行緒去處理事件,直到socket可讀或可寫了,通過中斷訊號通知cpu,再切換回來繼續處理資料。例如執行緒A正在等待socket可讀,而執行緒B已經就緒了,那麼就可以先切換到執行緒B去處理。雖然上下文切換也會花一些時間,但是遠比阻塞線上程A這裡空等要好。當然計算機內部實際的情況比這複雜得多。
而NIO的讀寫函式可以立刻返回,這就給了我們不開執行緒利用CPU的最好機會:如果一個連線不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來。因此只需要一個執行緒不斷地輪詢這些事件,一旦有就緒的時間,處理即可。不需要多執行緒。
阻塞型IO
- 需要多執行緒,即需要很大的執行緒池。
- 每個請求都要有一個單獨的執行緒去處理。
非阻塞型IO
- 只需要數量非常少的執行緒。
- 固定的幾個工作執行緒去處理事件。
使用NIO我們能得到什麼?
- 事件驅動模型
- 避免多執行緒
- 單執行緒處理多工
- 非阻塞I/O,I/O讀寫不再阻塞,而是返回0
- 基於block的傳輸,通常比基於流的傳輸更高效
- 更高階的IO函式,zero-copy
- IO多路複用大大提高了Java網路應用的可伸縮性和實用性
響應式程式設計入門
響應式程式設計就是基於reactor的思想,當你做一個帶有一定延遲的才能夠返回的io操作時,不會阻塞,而是立刻返回一個流,並且訂閱這個流,當這個流上產生了返回資料,可以立刻得到通知並呼叫回撥函式處理資料。
基本模型
我們首先需要理解響應式程式設計的基本模型:
Flux
Reactor中的釋出者(Publisher)由Flux和Mono兩個類定義,它們都提供了豐富的操作符(operator)。一個Flux物件代表一個包含0..N個元素的響應式序列,元素可以是普通物件、資料庫查詢的結果、http響應體,甚至是異常。而一個Mono物件代表一個包含零/一個(0..1)元素的結果。上圖就是一個Flux型別的資料流,Flux往流上傳送了3個元素,Subscriber通過訂閱這個流來接收通知。
如何建立一個流?最簡單的方式有以下幾種:
//建立一個流,並直接往流上釋出一個值為value資料 Flux.just(value); //通過list建立一個流,往流上依次釋出list中的資料 Flux.fromIterable(list); //建立一個流,並向流上從i開始連續釋出n個數據,資料型別為Integer Flux.range(i, n); //建立一個流,並定時向流上釋出一個數據,資料從0開始遞增,資料型別為Long Flux.interval(Duration.ofSeconds(n));
既然是“資料流”的釋出者,Flux和Mono都可以發出三種“資料訊號”:元素值、錯誤訊號、完成訊號,錯誤訊號和完成訊號都是終止訊號,完成訊號用於告知下游訂閱者該資料流正常結束,錯誤訊號終止資料流的同時將錯誤傳遞給下游訂閱者。
Subscriber
subscriber是一個訂閱者,他只有非常簡單的4個介面:
public interface Subscriber<T> { void onSubscribe(Subscription var1); //收到下一個元素值訊號時的行為 void onNext(T var1); //收到錯誤訊號時的行為 void onError(Throwable var1); //收到終止訊號時的行為 void onComplete(); }
Subscriber必須要訂閱一個Flux才能夠接收通知:
flux.subscribe( value -> handleData(value), error -> handleError(error), () -> handleComplete() );
上面這個例子通過lambda表示式,定義了Subscriber分別在收到訊息,收到錯誤,和訊息流結束時的行為,當Subscriber接收到一個新資料,就會非同步地執行handleData方法處理資料。
簡單例子:
接下來我們建立幾個最簡單的流來試一下:
首先我們新建一個maven專案,引入reactor的類庫:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.2.3.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.2.3.RELEASE</version> <scope>test</scope> </dependency> </dependencies>
編寫程式碼如下:
public class ReactorTests { @After public void after() { sleep(30_000); } @Test public void testJust() { Flux.just("hello", "world") .subscribe(System.out::println); } @Test public void testList() { List<String> words = Arrays.asList( "hello", "reactive", "world" ); Flux.fromIterable(words) .subscribe(System.out::println); } @Test public void testRange() { Flux.range(1, 10) .subscribe(System.out::println); } @Test public void testInterval() { Flux.interval(Duration.ofSeconds(1)) .subscribe(System.out::println); } }
訂閱這些流,收到資料之後只是簡單地把它打印出來,執行這些Test,就能夠看到訂閱者在接收到流上的資料時,非同步地去處理這些資料。