響應式程式設計系列一《規約》
提升開發效率,降低維護成本一直是開發團隊永恆不變的宗旨。近兩年來國內的技術圈子中越來越多的開始提及ReactiveX,越來越多的應用和麵試中都會有ReactiveX,響應式程式設計中RxJava可謂如魚得水。
目錄
1. 背景
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,最初是LINQ的一個擴充套件,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個程式設計模型,目標是提供一致的程式設計介面,幫助開發者更方便的處理非同步資料流,Rx庫支援.NET、JavaScript和C++,Rx近幾年越來越流行了,現在已經支援幾乎全部的流行程式語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社群網站是
主要版本:RxJava 1.x(官方已宣佈停止維護),RxJava 2.x(全新的API)
2. 響應式程式設計是什麼
響應式程式設計是一種基於非同步資料流概念的程式設計模式。資料流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合併為一條新的流。響應式程式設計的一個關鍵概念是事件。事件可以被等待,可以觸發過程,也可以觸發其它事件。
Rx提供了一系列的操作符,你可以使用它們來過濾(filter)、選擇(select)、變換(transform)、結合(combine)和組合(compose)多個Observable,這些操作符讓執行和複合變得非常高效。
響應式流從2013年開始,作為提供非阻塞背壓的非同步流處理標準的倡議。它旨在解決處理元素流的問題——如何將元素流從釋出者傳遞到訂閱者,而不需要釋出者阻塞,或訂閱者有無限制的緩衝區或丟棄。
響應式流模型非常簡單——訂閱者向釋出者傳送多個元素的非同步請求。 釋出者向訂閱者非同步傳送多個或稍少的元素。響應式流在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。
3. 優勢 & 代價
優勢
- 函式式風格:對可觀察資料流使用無副作用的輸入輸出函式,避免了程式裡錯綜複雜的狀態;
- 簡化程式碼:Rx的操作符通通常可以將複雜的難題簡化為很少的幾行程式碼(宣告式的組合這些序列);
- 非同步錯誤處理:傳統的try/catch沒辦法處理非同步計算,Rx提供了合適的錯誤處理機制;
- 輕鬆使用併發:Rx的Observables和Schedulers讓開發者可以擺脫底層的執行緒同步和各種併發問題;
代價
- 雖然複用執行緒有助於提高吞吐量,但一旦在某個回撥函式中執行緒被卡住,那麼這個執行緒上所有的請求都會被阻塞,最嚴重的情況,整個應用會被拖垮。
- 難以除錯。由於 Rx 強大的描述能力,在一個典型的 Rx 應用中,大部分程式碼都是以鏈式表示式的形式出現,比如flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出錯,你將很難定位到具體是哪個環節出了問題。所幸的是,Rx 框架一般都會提供一些工具方法來輔助進行除錯。
4. Reactive Streams規約
4.1 Publisher
是潛在無限數量的有序元素的生產者。 它根據收到的要求向當前訂閱者釋出(或傳送)元素
4.2 Subscriber
從釋出者那裡訂閱並接收元素。 釋出者向訂閱者傳送訂閱令牌(subscription token)。 使用訂閱令牌,訂閱者從釋出者哪裡請求多個元素。 當元素準備就緒時,釋出者向訂閱者傳送多個或更少的元素。 訂閱者可以請求更多的元素。 釋出者可能有多個來自訂閱者的元素待處理請求。
4.3 Subscription
表示訂閱者訂閱的一個釋出者的令牌。 當訂閱請求成功時,釋出者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與釋出者進行互動,例如請求更多的元素或取消訂閱。
4.4 Processor
充當訂閱者和釋出者的處理階段。Processor<T,R>
訂閱型別T的資料元素,接收並轉換為型別R的資料,併發布變換後的資料,該介面繼承了Publisher
和Subscriber
介面。
5. 主流實現
5.1 Rx2.x
RxJava是響應式流的Java實現之一,而RxJava 2.0 已經按照Reactive-Streams specification規範完全的重寫了。
5.2 Reactive Stream
Reactor 2.0.0.RC1 於2015年02月19日由Pivotal RTI(Spring 框架發起者)釋出,支援 Reactive Stream,它的構架總覽:
注:上圖參考附錄Reactor指南中文版,Reactor1.x實在是不太出名,也是規範沒出來吧
Reactor 程式碼庫拆分成多個子模組,便於選擇所需功能,不受其他功能程式碼塊干擾。
下面舉例說明,為實現非同步目標,響應式技術和 Reactor 模組該如何搭配:
- Spring XD + Reactor-Net (Core/Stream): 使用 Reactor 作為 Sink/Source IO 驅動。
- Grails | Spring + Reactor-Stream (Core): 用 Stream 和 Promise 做後臺處理。
- Spring Data + Reactor-Bus (Core): 發射資料庫事件 (儲存/刪除/…)。
- Spring Integration Java DSL + Reactor Stream (Core): Spring 整合的微批量資訊通道。
- RxJavaReactiveStreams + RxJava + Reactor-Core: 融合富結構與高效非同步 IO 處理
- RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream): 用 RxJava 做資料輸入,非同步 IO 驅動做傳輸。
與Rx2.x的差異:
RxJava |
reactor-stream |
說明 |
Observable |
reactor.rx.Stream |
Reactive Stream Publisher的實現 |
Operator |
reactor.rx.action.Action |
Reactive Stream Processor的實現 |
Observable with 1 data at most |
reactor.rx.Promise |
返回唯一結果的型別, Reactive Stream Processor實現並提供了可選的非同步分發功能。 |
Factory API (just, from…) |
reactor.rx.Streams |
和core模組的 data-focused 子類一樣, 返回 Stream |
Functional API (map, filter…) |
reactor.rx.Stream |
和core模組的data-focused 子類一樣, 返回Stream |
Schedulers |
reactor.core.Dispatcher, org.reactivestreams.Processor |
Reactor Stream計算無限制的共享Dispatcher或者有限的Processor的操作。 |
Observable.observeOn() |
Stream.dispatchOn() |
只是dispatcher引數的一個適配命名。 |
5.3 Java9
JDK 9 java.util.concurrent 包提供了兩個主要的 API 來處理響應流:
- Flow
- SubmissionPublisher
5.4 Spring WebFlux
Spring WebFlux 是 Spring 5 的一個新模組,包含了響應式 HTTP 和 WebSocket 的支援,在容器中 Spring WebFlux 會將輸入流適配成 Mono 或者 Flux 格式進行統一處理。,另外在上層服務端支援兩種不同的程式設計模型: - 基於 Spring MVC 註解 @Controller 等 - 基於 Functional 函式式路由
為啥只能執行在 Servlet 3.1+ 容器?3.1 規範其中一個新特性是非同步處理支援。
5.5 Vertx
Vert.x是一個非同步無阻塞的網路框架,其參照物是node.js。基本上node.js能幹的事情,Vert.x都能幹。Vert.x利用Netty4的EventLoop來做單執行緒的事件迴圈,所以跑在Vert.x上的業務不能做CPU密集型的運算,這樣會導致整個執行緒被阻塞。
Vert.x目前是見過功能最強大(core、web、Data access、reacive、microservices、MQTT),第三方庫依賴最少的Java框架,它只依賴Netty4以及Jacskon,另外如果你需要建立分散式的Vert.x則再依賴HazelCast這個分散式框架,注意Vert.x3必須基於Java8。
總結,響應式程式設計已經慢慢成我我們開發中的主流,後續將帶您深入瞭解《rxjava》