RxJava2.x初識
一,初識RxJava
(1),什麼是RxJava?
RxJava是 ReactiveX(ReactiveX推薦http://reactivex.io/) 在JVM上的一個實現,ReactiveX使用Observable序列組合非同步和基於事件的程式。RxJava是在ReactiveX的一個延伸,RxJava是輕量級的,RxJava只關注Observable的抽象和與之相關的高階函式。通俗一點,RxJava是一個程式設計模型,提供一致的程式設計介面,幫助我們處理非同步資料流。RxJava是以函式的響應方式體現。
(2),為什麼要使用RxJava?
從表面來講,RxJava編寫程式碼簡潔,注意簡潔不是簡單。程式碼量並不一定能減少,只是結構清晰,便於閱讀。其實最根本的是,RxJava可以靈活的處理資料流或事件,高效的處理執行緒建立和併發。
二,如何使用RxJava
(1),如果使用AndroidStudio,需要在gradle中新增
//RxJava的依賴包
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
//RxAndroid的依賴包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
(2),這裡以RxJava2.x版本介紹如何使用,建議讀者可以先看下RxJava1.x版本,這樣可以對比版本差異,有助於更好的理解RxJava。
(3),使用場景,通常在Android開發過程中,需要在後臺執行緒處理一些任務。比如:AsyncTask,ContentProvider,Services,網路請求等。使用AsyncTask會導致記憶體洩漏,ContentProvider和Services配置繁瑣。而RxJava就可以幫助我們解決這些問題。
三,RxJava觀察者模式
(1),概念:Android開發中,我們經常用到的點選事件就是觀察者模式。View就是被觀察者,OnClickListener 是觀察者,兩者通過setOnClickListener建立繫結關係(這裡簡單介紹,如果想了解Android更多開發模式,推薦書籍《Android原始碼設計模式》)
(2),Observable(被觀察者)/Flowable(被觀察者 RxJava2.x版本新增),Observer(觀察者)/Subscriber(觀察者),Subscribe(訂閱者)。觀察者和被觀察者通過訂閱建立繫結關係。
(3),RxJava2.X中, Observeable用於訂閱Observer ,是不支援背壓的,而 Flowable用於訂閱Subscriber ,是支援背壓(Backpressure)的。背壓是指:被觀察者傳送資料或事件的速度,遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低傳送速度的策略。關於背壓的概念推薦
四,操作符
(1),create()
使用 Create操作符從頭開始建立一個Observable/Flowable,這個操作符傳遞一個接受觀察者作為引數的函式。
//建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello Word");
e.onComplete();
}
});
//建立觀察者
Observer observer = new Observer<String>() {
//這是RxJava2.x新加入的方法,在訂閱之後傳送資料之前,而Disposable可用於取消訂閱
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
//建立繫結關係
observable.subscribe(observer);
以上程式碼為Observable 實現方式 , 列印結果為 onSubscribe->Hello Word->onComplete。在這裡就可以看出,onSubscribe會在onNext之前呼叫,這裡可以在onSubscribe方法中做一些初始化的操作。
Flowable flowable = Flowable.create(new FlowableOnSubscribe(){
@Override
public void subscribe(FlowableEmitter e) throws Exception {
e.onNext("Hello Word");
e.onComplete();
}
}, BackpressureStrategy.BUFFER);//背壓模式,不指定不支援背壓
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
s.request(1);//request(n)來告訴上游被觀察者傳送多少個數據
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
flowable.subscribe(subscriber);
以上程式碼為Flowable 實現方式列印結果為onSubscribe->HelloWord->onComplete。這裡需要注意,如果有初始化操作要在request(n)之前執行。Flowable是支援背壓的,簡單來說,上游被觀察者會響應下游觀察者的資料請求,下游觀察者通過request(n)通知上游被觀察者傳送多少請求。這樣避免被觀察者傳送資料或事件的速度,遠快於觀察者處理資料或事件的速度,造成大量資料或事件堆積。
被觀察者ObservableEmitter/FlowableEmitter,這裡指發射器的意思,可以發射onNext(),onError(),onComplete()。被觀察者可以傳送多個onNext(),觀察者也可以接收多個onNext()。被觀察者傳送了onComplete()/onError()之後事件繼續傳送,而觀察者接收到onComplete()之後不在繼續接收事件。被觀察者可以不傳送onComplete()/onError(),兩者必須是唯一併且互斥。
- 觀察者Disposable ,這裡指丟棄的意思,呼叫它觀察者收不到事件,而對被觀察者沒有影響
(2),just()
將一個或多個物件轉換成發射這個或這些物件的一個Observable/Flowable
Observable.just("Hello World", "Very Good").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});
Flowable.just("Hello World", "Very Good")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, s);
}
});
這裡有個Consumer,表示觀察者只關心onNext()事件,其他事件不管,因此可以這麼寫
(3),range()
建立一個發射指定範圍的整數序列的Observable/Flowable,可以指定範圍的起始和長度。接受兩個引數,一個是範圍的起始值,一個是範圍的資料的數目。
Observable.range(3, 4).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
//輸出3,4,5,6
Flowable.range(2,4).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
//輸出2,3,4,5
(4),flatMap()
將一個發射資料的Observable變換為多個Observables,然後將它們發射的資料合併後放進一個單獨的Observable。
如何使用flatMap(),例如:我們需要做一個商品分類,主商品(Goods)和子商品(Category)。
new Thread(new Runnable() {
@Override
public void run() {
//在子執行緒獲取資料
List<Goods> list=//資料來源,所有商品,比如:鞋子類別中包含運動鞋,登山鞋,板鞋等
for(//主商品,如:鞋子){
Log.e(TAG,"主商品名稱");
for(//子商品,如:運動鞋){
//主執行緒中更新UI
runOnUiThread(new Runnable() {
@Override
public void run() {
Log.e(TAG,"子商品");
}
});
}
}
}
}).start();
這段虛擬碼是我們常用的寫法,在子執行緒中獲取資料,主執行緒中更新UI,如果資料比較多,邏輯複雜程式碼可讀性當然會差。再看看flatMap()應該怎麼寫
Flowable.fromIterable(goods).flatMap(new Function<Goods, Publisher<Category>>() {
@Override
public Publisher<Category> apply(Goods goods) throws Exception {
return Flowable.fromIterable(goods.getList());
}
})
.subscribeOn(Schedulers.io())//下邊會介紹
.observeOn(AndroidSchedulers.mainThread())//下邊會介紹
.subscribe(new Consumer<Category>() {
@Override
public void accept(Category category) throws Exception {
Log.e(TAG, category.getName());
}
});
對比一下這種方式結構清晰了很多,和flatMap()一樣的還有一個操作符contactMap(),唯一的區別就是flatMap()是無序的,最後輸出的和原序列不一定相同。contactMap()是有序的,最後輸出的和原序列相同
有關操作符就介紹到這裡,如果想了解其他操作符可以進入http://reactivex.io/進行檢視
五,執行緒控制(Scheduler)
(1),對於Android開發來說,這也是最有用的地方。我們經常需要在程式中做一些耗時的操作,比如:網路請求,檔案操作,資料庫操作。通常我們都是在子執行緒中做一些耗時操作,在主執行緒中更新ui。
(2),Scheduler都有哪些有用的方法呢
- Schedulers.newThread()
總是啟用新執行緒,並在新執行緒執行操作 - Schedulers.io()
代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作。和newThread()差不多,不過Schedulers.io()這裡內部用到了執行緒池,比newThread()效率更高 - Schedulers.computation()
代表CPU計算密集型的操作, 例如需要大量計算的操作 - AndroidSchedulers.mainThread()
代表Android的主執行緒
(3),當我們建立一個Observable/Flowable預設是在主執行緒中進行,如下程式碼:
Flowable.just("Hello World", "Very Good")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, Thread.currentThread().getName());
}
});
//列印結果 main
如上程式碼,不管是觀察者還是被觀察者,如果不指定Scheduler,它就是預設的執行緒。開發過程中往往我們需要指定執行緒,如何進行變換呢?
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe " + Thread.currentThread().getName());
e.onNext(1);
}
}, BackpressureStrategy.BUFFER);
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept " + Thread.currentThread().getName());
}
};
flowable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
//列印結果 subscribe RxNewThreadScheduler-1
accept main
可以看出,被觀察者在子執行緒中進行,觀察者在主執行緒中進行,主要是因為
subscribeOn(Schedulers.newThread())
observeOn(AndroidSchedulers.mainThread())
subscribeOn()指定上游被觀察者的執行緒,observeOn()指定下游觀察者的執行緒
有關RxJava2.x就簡單介紹到這裡,如果有時間會做一個專案,系統的使用這個框架。本文並非完全原創,參考了一些文章加上自己的理解,簡化了一些內容,沒有涉及原始碼,對於初學者來說容易上手