1. 程式人生 > 程式設計 >ReactiveX使用介紹

ReactiveX使用介紹

ReactiveX是一個用觀察者模式開發非同步基於事件程式設計的庫。

它擴充套件了觀察者模式,支援流式的資料和事件,然後增加一些操作符可以用於靈活的處理流式資料與事件。其內建了很多不同型別的操作符,並且宣告式的寫法,讓我們可以專注開發業務而無需考慮很多底層的執行緒,併發,非阻塞操作。

概念

什麼是流式事件?

從事件維度上看,就是在不同時間依次發生的事件。比如說依次點選螢幕,如下可以看做流式的事件。stream of event或者event flow

img

為什麼使用觀察者模式(Observable)?

觀察者模型,讓我們在處理非同步事件流時更加簡單,就是提高效率。可以減少寫很多的回撥操作

單條資料(single items) 多條資料(multi items)
同步/synchronous T getData() Iterable getData()
非同步/asynchronous Future getData() Observable getData()

如上圖可以看出來,JDK本身沒有提供對多條資料的非同步支援。比如非同步的返回,A,B,C三個東西,順序不做要求,那麼一般就要寫三個Future。如果三個資料在同一個執行緒內處理,就是同步的。

Observable提供了一種理想化的方式來訪問非同步的流式資料(asynchronous sequences of multiple items

)

Observables可以進行組合

Java的Future是一種直接的方式來處理單個非同步的操作,但是如果巢狀使用Future的時候,在非同步裡面繼續非同步Future,程式碼就容易變得複雜不易維護。

組合多個Future進行非同步化的條件執行下一步操作時,寫法比較難優化。如果使用了Future.get可以更容易預測到結果,但是又過早的進入了阻塞。

ReactiveX Observables意在組合流式的非同步資料。

Observables更加靈活

ReactiveX Observables不僅僅支援單個的數值,同時支援無限的資料流,無限資料流即永遠都一直有新的資料出現,無法處理到最後。

上面提到了Observables是一種理想的方式處理非同步的多個資料。其餘同步的集合迭代器對比如下:

資料(event) Iterable (pull) Observable (push)
獲取資料 T next() onNext(T)
發現錯誤 throws Exception onError(Exception)
完成處理 !hasNext() onCompleted()

相比於同步集合的pull資料,Observable是非同步的push資料。

  • 使用Iterable,當沒有資料到達的時候會進入阻塞狀態。
  • 使用Observable,只要有資料可用的時候,就將資料推送給consumer,不管值是有還是沒有,是同步還是非同步。

Observable在經典的觀察者模式中添加了兩個新的語法,功能更強大靈活,並且也使得其寫法與Iterable保持統一,讓我們寫Observable的時候就像在寫Iterable:

  • 當生產者資料被消耗完畢的時候,Observable呼叫Observer的onCompleted方法
  • 當消費者在遇到錯誤或異常的時候,Observable呼叫Observer的onError方法

再看下Observable與Iterable的寫法對比: 寫法上沒太大的變化,這裡的Iterable可以看做Java 8中的Stream。

image-20191212072820313

Observable的靈活還有其不關心其資料來源的如何實現,不管是使用執行緒池,actors,event loop或者其它內容,在Observable看來都是非同步的。

使用ReactiveX時,可以修改底層的Observable實現而不需要修改Observable的Consumer程式碼。

使用

ReactiveX更像是一種理念,常見的程式語言都有關於ReactiveX的實現。以Java的實現RxJava為例。

寫一個簡單的Hello World案例:

1 引入Maven。

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.12</version>
        </dependency>
複製程式碼

2 編寫Rxjava相關程式碼

public class RxHello {
    public static void main(String[] args) {
        // 1. 建立Observable
        Flowable<String> observable = Flowable
                .just("hello","world")
                // 2. 使用Operator
                .map(String::toUpperCase)
                ;

        //3. 建立Observer
        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("收到資料:" + s);
            }
        };

        // 4. 建立訂閱關係
        observable.subscribe(consumer);
    }
}    
複製程式碼

上面程式碼看起來比較簡單,實際上寫的時候也確實比較簡單,主要步驟就三步:加上操作符可以說是4步

1 建立Observable,ReactiveX提供了一些不同的Operator來建立,具體有沒有實現要到對應的程式語言中看:

// 以下operator是可以建立
`Create`,`Defer`,`Empty`/`Never`/`Throw`,`From`,`Interval`,`Just`,`Range`,`Repeat`,`Start`,and `Timer
			
			// 比如:
        Observable.create();
        Observable.defer();
        Observable.just();
        Observable.fromArray()
複製程式碼

2 進行Observable的資料處理,內建了非常多的操作符。轉換,過濾,統計,錯誤處理等等

常見的Map,FlatMap,Filter,Take,Skip等

3 建立Observer,即觀察者或者消費者,用於接收Observable的資料

常見的類:FlowableSubscriber,Subscriber,Comsumer,主要是我們的程式碼邏輯。

4 建立Observable和Observer的關係。

最後

這裡只是介紹了ReactiveX的概念,介紹關鍵非同步流式資料,與觀察者模式部分,提供了案例程式碼,瞭解ReactiveX的基本寫法。

想要提要到ReactiveX的強大,還需要掌握其豐富的Operator操作符功能。

參考:

reactivex.io/intro.html

reactivex.io/documentati…

medium.com/@andrestalt…