1. 程式人生 > >RxJava2.x初識

RxJava2.x初識

一,初識RxJava

(1),什麼是RxJava?

RxJava是 ReactiveX(ReactiveX推薦http://reactivex.io/) 在JVM上的一個實現,ReactiveX使用Observable序列組合非同步和基於事件的程式。RxJava是在ReactiveX的一個延伸,RxJava是輕量級的,RxJava只關注Observable的抽象和與之相關的高階函式。通俗一點,RxJava是一個程式設計模型,提供一致的程式設計介面,幫助我們處理非同步資料流。RxJava是以函式的響應方式體現。

(2),為什麼要使用RxJava?

從表面來講,RxJava編寫程式碼簡潔,注意簡潔不是簡單。程式碼量並不一定能減少,只是結構清晰,便於閱讀。其實最根本的是,RxJava可以靈活的處理資料流或事件,高效的處理執行緒建立和併發。

二,如何使用RxJava

(1),如果使用AndroidStudio,需要在gradle中新增

    //RxJava的依賴包
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    //RxAndroid的依賴包
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

(2),這裡以RxJava2.x版本介紹如何使用,建議讀者可以先看下RxJava1.x版本,這樣可以對比版本差異,有助於更好的理解RxJava。
(3),使用場景,通常在Android開發過程中,需要在後臺執行緒處理一些任務。比如:AsyncTask,ContentProvider,Services,網路請求等。使用AsyncTask會導致記憶體洩漏,ContentProvider和Services配置繁瑣。而RxJava就可以幫助我們解決這些問題。

三,RxJava觀察者模式

(1),概念:Android開發中,我們經常用到的點選事件就是觀察者模式。View就是被觀察者,OnClickListener 是觀察者,兩者通過setOnClickListener建立繫結關係(這裡簡單介紹,如果想了解Android更多開發模式,推薦書籍《Android原始碼設計模式》)

(2),Observable(被觀察者)/Flowable(被觀察者 RxJava2.x版本新增),Observer(觀察者)/Subscriber(觀察者),Subscribe(訂閱者)。觀察者和被觀察者通過訂閱建立繫結關係。

(3),RxJava2.X中, Observeable用於訂閱Observer ,是不支援背壓的,而 Flowable用於訂閱Subscriber ,是支援背壓(Backpressure)的。背壓是指:被觀察者傳送資料或事件的速度,遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低傳送速度的策略。關於背壓的概念推薦

http://blog.csdn.net/jdsjlzx/article/details/52717636

四,操作符

(1),create()
使用 Create操作符從頭開始建立一個Observable/Flowable,這個操作符傳遞一個接受觀察者作為引數的函式。

   //建立被觀察者
   Observable observable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Hello Word");
                e.onComplete();
            }
        });
   //建立觀察者     
   Observer observer = new Observer<String>() {
            //這是RxJava2.x新加入的方法,在訂閱之後傳送資料之前,而Disposable可用於取消訂閱
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");

            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, s);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        };
    //建立繫結關係   
    observable.subscribe(observer);

以上程式碼為Observable 實現方式 , 列印結果為 onSubscribe->Hello Word->onComplete。在這裡就可以看出,onSubscribe會在onNext之前呼叫,這裡可以在onSubscribe方法中做一些初始化的操作。

     Flowable flowable = Flowable.create(new FlowableOnSubscribe(){

            @Override
            public void subscribe(FlowableEmitter e) throws Exception {
                e.onNext("Hello Word");
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);//背壓模式,不指定不支援背壓
     Subscriber subscriber = new Subscriber<String>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe");
                s.request(1);//request(n)來告訴上游被觀察者傳送多少個數據
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, s);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        };
    flowable.subscribe(subscriber);
  • 以上程式碼為Flowable 實現方式列印結果為onSubscribe->HelloWord->onComplete。這裡需要注意,如果有初始化操作要在request(n)之前執行。Flowable是支援背壓的,簡單來說,上游被觀察者會響應下游觀察者的資料請求,下游觀察者通過request(n)通知上游被觀察者傳送多少請求。這樣避免被觀察者傳送資料或事件的速度,遠快於觀察者處理資料或事件的速度,造成大量資料或事件堆積。

  • 被觀察者ObservableEmitter/FlowableEmitter,這裡指發射器的意思,可以發射onNext(),onError(),onComplete()。被觀察者可以傳送多個onNext(),觀察者也可以接收多個onNext()。被觀察者傳送了onComplete()/onError()之後事件繼續傳送,而觀察者接收到onComplete()之後不在繼續接收事件。被觀察者可以不傳送onComplete()/onError(),兩者必須是唯一併且互斥。

  • 觀察者Disposable ,這裡指丟棄的意思,呼叫它觀察者收不到事件,而對被觀察者沒有影響

(2),just()
將一個或多個物件轉換成發射這個或這些物件的一個Observable/Flowable

    Observable.just("Hello World", "Very Good").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, s);
            }
        });
    Flowable.just("Hello World", "Very Good")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        Log.e(TAG, s);
                    }
                });

這裡有個Consumer,表示觀察者只關心onNext()事件,其他事件不管,因此可以這麼寫
(3),range()
建立一個發射指定範圍的整數序列的Observable/Flowable,可以指定範圍的起始和長度。接受兩個引數,一個是範圍的起始值,一個是範圍的資料的數目。

     Observable.range(3, 4).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, integer + "");
            }
        });
        //輸出3,4,5,6
     Flowable.range(2,4).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, integer + "");
            }
        });
        //輸出2,3,4,5

(4),flatMap()
將一個發射資料的Observable變換為多個Observables,然後將它們發射的資料合併後放進一個單獨的Observable。
如何使用flatMap(),例如:我們需要做一個商品分類,主商品(Goods)和子商品(Category)。

 new Thread(new Runnable() {
            @Override
            public void run() {
                //在子執行緒獲取資料
                List<Goods> list=//資料來源,所有商品,比如:鞋子類別中包含運動鞋,登山鞋,板鞋等
                for(//主商品,如:鞋子){
                    Log.e(TAG,"主商品名稱");
                    for(//子商品,如:運動鞋){
                            //主執行緒中更新UI
                            runOnUiThread(new Runnable() {
                                @Override
                                public void run() {
                                    Log.e(TAG,"子商品");
                                }
                            });
                    }
                }
            }
        }).start();

這段虛擬碼是我們常用的寫法,在子執行緒中獲取資料,主執行緒中更新UI,如果資料比較多,邏輯複雜程式碼可讀性當然會差。再看看flatMap()應該怎麼寫

 Flowable.fromIterable(goods).flatMap(new Function<Goods, Publisher<Category>>() {
            @Override
            public Publisher<Category> apply(Goods goods) throws Exception {
                return Flowable.fromIterable(goods.getList());
            }

        })
                .subscribeOn(Schedulers.io())//下邊會介紹
                .observeOn(AndroidSchedulers.mainThread())//下邊會介紹
                .subscribe(new Consumer<Category>() {
                    @Override
                    public void accept(Category category) throws Exception {
                        Log.e(TAG, category.getName());
                    }
                });

對比一下這種方式結構清晰了很多,和flatMap()一樣的還有一個操作符contactMap(),唯一的區別就是flatMap()是無序的,最後輸出的和原序列不一定相同。contactMap()是有序的,最後輸出的和原序列相同

有關操作符就介紹到這裡,如果想了解其他操作符可以進入http://reactivex.io/進行檢視

五,執行緒控制(Scheduler)

(1),對於Android開發來說,這也是最有用的地方。我們經常需要在程式中做一些耗時的操作,比如:網路請求,檔案操作,資料庫操作。通常我們都是在子執行緒中做一些耗時操作,在主執行緒中更新ui。
(2),Scheduler都有哪些有用的方法呢

  • Schedulers.newThread()
    總是啟用新執行緒,並在新執行緒執行操作
  • Schedulers.io()
    代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作。和newThread()差不多,不過Schedulers.io()這裡內部用到了執行緒池,比newThread()效率更高
  • Schedulers.computation()
    代表CPU計算密集型的操作, 例如需要大量計算的操作
  • AndroidSchedulers.mainThread()
    代表Android的主執行緒

(3),當我們建立一個Observable/Flowable預設是在主執行緒中進行,如下程式碼:

 Flowable.just("Hello World", "Very Good")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        Log.e(TAG, Thread.currentThread().getName());
                    }
                });
        //列印結果 main

如上程式碼,不管是觀察者還是被觀察者,如果不指定Scheduler,它就是預設的執行緒。開發過程中往往我們需要指定執行緒,如何進行變換呢?

 Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "subscribe   " + Thread.currentThread().getName());
                e.onNext(1);
            }
        }, BackpressureStrategy.BUFFER);
        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept   " + Thread.currentThread().getName());
            }
        };
        flowable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);
                //列印結果  subscribe   RxNewThreadScheduler-1 
                           accept   main

可以看出,被觀察者在子執行緒中進行,觀察者在主執行緒中進行,主要是因為

subscribeOn(Schedulers.newThread())
observeOn(AndroidSchedulers.mainThread())

subscribeOn()指定上游被觀察者的執行緒,observeOn()指定下游觀察者的執行緒

有關RxJava2.x就簡單介紹到這裡,如果有時間會做一個專案,系統的使用這個框架。本文並非完全原創,參考了一些文章加上自己的理解,簡化了一些內容,沒有涉及原始碼,對於初學者來說容易上手