RXJAVA2x 心得記錄
RXJava的初窺
1.RxJava是Reactive Extensions的Java VM實現:一個用於通過使用可觀察序列來編寫非同步和基於事件的程式的庫,然而,對於初學者來說,這太難看懂了。因為它是一個『總結』,而初學者更需要一個『引言』。其實, RxJava 的本質可以壓縮為非同步這一個詞。說到根上,它就是一個實現非同步操作的庫,而別的定語都是基於這之上的。
2.RxJava 有四個基本概念:Observable
(可觀察者,即被觀察者)、 Observer
(觀察者)、 subscribe
(訂閱)、事件。Observable
和 Observer
通過 subscribe()
Observable
可以在需要的時候發出事件來通知 Observer
。與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext()
(相當於 onClick()
/ onEvent()
)之外,還定義了兩個特殊的事件:onCompleted()
和 onError()
。
- onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
- onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
- 在一個正確執行的事件序列中, onCompleted
()
和 onError(): 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted()
和 onError():二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。
一丶基本使用
1.設定依賴項 implementation "io.reactivex.rxjava2:rxjava:2.2.2"
2. 建立Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer介面的實現方式:
Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d("MainActivity", "Item: " + s); } @Override public void onCompleted() { Log.d("MainActivity", "Completed!"); } @Override public void onError(Throwable e) { Log.d("MainActivity", "Error!"); } };
3. 建立 Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 Create()
方法來建立一個 Observable ,併為它定義事件觸發規則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Since");
subscriber.onNext("Hi");
subscriber.onNext("SIncerity");
subscriber.onCompleted();
}
});
可以看到,這裡傳入了一個 OnSubscribe
物件作為引數。OnSubscribe
會被儲存在返回的 Observable
物件中,它的作用相當於一個計劃表,當 Observable
被訂閱的時候,OnSubscribe
的 call()
方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber
將會被呼叫三次 onNext()
和一次 onCompleted()
)。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
4.Subscribe (訂閱)
建立了 Observable
和 Observer
之後,再用 subscribe()
方法將它們聯結起來,整條鏈子就可以工作了。程式碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
5.執行緒控制 Scheduler
在不指定執行緒的情況下, RxJava 遵循的是執行緒不變的原則,即:在哪個執行緒呼叫 subscribe()
,就在哪個執行緒生產事件;在哪個執行緒生產事件,就在哪個執行緒消費事件。如果需要切換執行緒,就需要用到 Scheduler
(排程器)。
在RxJava 中,Scheduler
——排程器,相當於執行緒控制器,RxJava 通過它來指定每一段程式碼應該執行在什麼樣的執行緒。RxJava 已經內建了幾個 Scheduler
,它們已經適合大多數的使用場景:
- Schedulers.immediate(): 直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler。
- Schedulers.newThread(): 總是啟用新執行緒,並在新執行緒執行操作。
- Schedulers.io: I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況 io() 比new thread() 更有效率。不要把計算工作放在 io 中,可以避免建立不必要的執行緒。
- Schedulers
.
computation() : 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在computation() 中,否則 I/O 操作的等待時間會浪費 CPU。 - 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主執行緒執行。
有了這幾個 Scheduler
,就可以使用 subscribeOn()
和 observeOn()
兩個方法來對執行緒進行控制了。 * subscribeOn()
: 指定 subscribe()
所發生的執行緒,即 Observable.OnSubscribe
被啟用時所處的執行緒。或者叫做事件產生的執行緒。 * observeOn()
: 指定 Subscriber
所執行在的執行緒。或者叫做事件消費的執行緒。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 執行緒
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主執行緒
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
二丶應用場景
一.專案需求- 刪除後N秒撤回
補充知識點 ,1. RxJava 還提供了一些方法用來快捷建立事件佇列 如:Observable.just(...)
Observable observable = Observable.just("Since", "Hi", "SIncerity");
// 將會依次呼叫:
// onNext("Since");
// onNext("Hi");
// onNext("SIncerity");
// onCompleted();
首先先定義 Subscribe
/**
* 延遲2s TimeUnit.SECONDS 時間單位為秒 包含所有的日期單位
**/
Subscription subscribe = Observable.just(1).delay(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//去做刪除操作
}
});
定義一個 Snackbar來完成回退操作
Snackbar snackbar = Snackbar.make(mynoteMain, title, Snackbar.LENGTH_SHORT)
.setAction(action, new View.OnClickListener() {
@Override
public void onClick(View v) {
//點選Snackbar執行的操作 解除訂閱
//subscription.unsubscribe();
}
});
即可完成刪除後回退操作 效果如下