ReactiveX使用介紹
ReactiveX是一個用觀察者模式
開發非同步
和基於事件程式設計
的庫。
它擴充套件了觀察者模式,支援流式的資料和事件,然後增加一些操作符可以用於靈活的處理流式資料與事件。其內建了很多不同型別的操作符,並且宣告式的寫法,讓我們可以專注開發業務而無需考慮很多底層的執行緒,併發,非阻塞操作。
概念
什麼是流式事件?
從事件維度上看,就是在不同時間依次發生的事件。比如說依次點選螢幕,如下可以看做流式的事件。stream of event
或者event flow
為什麼使用觀察者模式(Observable)?
觀察者模型,讓我們在處理非同步事件流
時更加簡單,就是提高效率。可以減少寫很多的回撥操作
單條資料(single items) | 多條資料(multi items) | |
---|---|---|
同步/synchronous | T getData() |
Iterable getData() |
非同步/asynchronous | Future getData() |
Observable getData() |
如上圖可以看出來,JDK本身沒有提供對多條資料的非同步支援。比如非同步的返回,A,B,C三個東西,順序不做要求,那麼一般就要寫三個Future。如果三個資料在同一個執行緒內處理,就是同步的。
Observable提供了一種理想化的方式來訪問非同步的流式資料(asynchronous sequences of multiple items
)
Observables可以進行組合
Java的Future是一種直接的方式來處理單個非同步的操作,但是如果巢狀使用Future的時候,在非同步裡面繼續非同步Future,程式碼就容易變得複雜不易維護。
組合多個Future進行非同步化的條件執行下一步操作時,寫法比較難優化。如果使用了Future.get可以更容易預測到結果,但是又過早的進入了阻塞。
ReactiveX Observables意在組合流式的非同步資料。
Observables更加靈活
ReactiveX Observables不僅僅支援單個的數值,同時支援無限的資料流,無限資料流即永遠都一直有新的資料出現,無法處理到最後。
上面提到了Observables是一種理想的方式處理非同步的多個資料。其餘同步的集合迭代器對比如下:
資料(event) | Iterable (pull) | Observable (push) |
---|---|---|
獲取資料 | T next() |
onNext(T) |
發現錯誤 | throws Exception
|
onError(Exception) |
完成處理 | !hasNext() |
onCompleted() |
相比於同步集合的pull資料,Observable是非同步的push資料。
- 使用Iterable,當沒有資料到達的時候會進入阻塞狀態。
- 使用Observable,只要有資料可用的時候,就將資料推送給consumer,不管值是有還是沒有,是同步還是非同步。
Observable在經典的觀察者模式中添加了兩個新的語法,功能更強大靈活,並且也使得其寫法與Iterable保持統一,讓我們寫Observable的時候就像在寫Iterable:
- 當生產者資料被消耗完畢的時候,Observable呼叫Observer的
onCompleted
方法 - 當消費者在遇到錯誤或異常的時候,Observable呼叫Observer的
onError
方法
再看下Observable與Iterable的寫法對比: 寫法上沒太大的變化,這裡的Iterable可以看做Java 8中的Stream。
Observable的靈活還有其不關心其資料來源的如何實現,不管是使用執行緒池,actors,event loop或者其它內容,在Observable看來都是非同步的。
使用ReactiveX時,可以修改底層的Observable實現而不需要修改Observable的Consumer程式碼。
使用
ReactiveX更像是一種理念,常見的程式語言都有關於ReactiveX的實現。以Java的實現RxJava為例。
寫一個簡單的Hello World案例:
1 引入Maven。
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.12</version>
</dependency>
複製程式碼
2 編寫Rxjava相關程式碼
public class RxHello {
public static void main(String[] args) {
// 1. 建立Observable
Flowable<String> observable = Flowable
.just("hello","world")
// 2. 使用Operator
.map(String::toUpperCase)
;
//3. 建立Observer
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("收到資料:" + s);
}
};
// 4. 建立訂閱關係
observable.subscribe(consumer);
}
}
複製程式碼
上面程式碼看起來比較簡單,實際上寫的時候也確實比較簡單,主要步驟就三步:加上操作符可以說是4步
1 建立Observable,ReactiveX提供了一些不同的Operator來建立,具體有沒有實現要到對應的程式語言中看:
// 以下operator是可以建立
`Create`,`Defer`,`Empty`/`Never`/`Throw`,`From`,`Interval`,`Just`,`Range`,`Repeat`,`Start`,and `Timer
// 比如:
Observable.create();
Observable.defer();
Observable.just();
Observable.fromArray()
複製程式碼
2 進行Observable的資料處理,內建了非常多的操作符。轉換,過濾,統計,錯誤處理等等
常見的Map,FlatMap,Filter,Take,Skip等
3 建立Observer,即觀察者或者消費者,用於接收Observable的資料
常見的類:FlowableSubscriber,Subscriber,Comsumer,主要是我們的程式碼邏輯。
4 建立Observable和Observer的關係。
最後
這裡只是介紹了ReactiveX的概念,介紹關鍵非同步流式資料,與觀察者模式部分,提供了案例程式碼,瞭解ReactiveX的基本寫法。
想要提要到ReactiveX的強大,還需要掌握其豐富的Operator操作符功能。
參考: