Rxjava2的學習與總結
阿新 • • 發佈:2019-01-06
Rxjava2基礎認知
-
形式正確的有限Observable 呼叫觀察者的onCompleted正好一次或者它的onError正好一次,而且此後不能再呼叫觀察者的任何其它方法。如果onComplete 或者 onError 走任何一個 都會 主動解除訂閱關係;
- 如果解除訂閱關係以後在發射 onError 則會 報錯;而發射onComplete則不會。
- 注意解除訂閱關係 還是可以發射 onNext
-
Disposable類:
- dispose():主動解除訂閱
- isDisposed():查詢是否解除訂閱 true 代表 已經解除訂閱
- CompositeDisposable類:可以快速解除所有新增的Disposable類 每當我們得到一個Disposable時就呼叫CompositeDisposable.add()將它新增到容器中, 在退出的時候, 呼叫CompositeDisposable.clear() 即可快速解除.
CompositeDisposable compositeDisposable=newCompositeDisposable();
Observable.create(newObservableOnSubscribe<Integer>(){
@Override
publicvoid subscribe(ObservableEmitter<Integer> emitter)throwsException{
emitter.onNext(1);
emitter.onComplete();或者 emitter.onError
(newThrowable("O__O "));}
}).subscribe(newObserver<Integer>(){
privateDisposable mDisposable;
@Override
publicvoid onSubscribe(Disposable d){
<!--訂閱-->
mDisposable = d;
<!--新增到容器中-->
compositeDisposable.add(d);
}
@Override
publicvoid onNext(Integer value){
<!--
判斷mDisposable.isDisposed()如果解除了則不需要處理-->}
@Override
publicvoid onError(Throwable e){
}
@Override
publicvoid onComplete(){
}
});
<!--解除所有訂閱者-->
compositeDisposable.clear();
基礎概念
- Scheduler scheduler
timer() alt+點選timer可檢視 關於timer的方法 可以看到時候有這個引數的變體!
- Callable bufferSupplier:自定義裝載的容器
Observable.range(1,10)
//() -> new ArrayList<>() 則是bufferSupplier
.buffer(2,1,()->newArrayList<>())
.subscribe(integers ->System.out.println(integers));
建立操作
- create : 建立一個具有發射能力的Observable
Observable.create(e ->{
e.onNext("Love");
e.onNext("For");
e.onNext("You!");
e.onComplete();
}).subscribe(s ->System.out.println(s));
- just:只是簡單的原樣發射,可將陣列或Iterable當做單個數據。它接受一至九個引數
Observable.just("Love","For","You!")
.subscribe(s ->System.out.println(s));
- empty:建立一個不發射任何資料但是正常終止的Observable
- never:建立一個不發射資料也不終止的Observable
- error:建立一個不發射資料以一個錯誤終止的Observable
Observable.empty();
Observable.never();
Observable.error(newThrowable("O__O"))
- timer 在延遲一段給定的時間後發射一個簡單的數字0
Observable.timer(1000,TimeUnit.MILLISECONDS)
.subscribe(s ->System.out.println(s));
- range:
- start:起始值
- count:一個是範 圍的資料的數目。0不傳送 ,負數 異常
Observable.range(5,3)
//輸出 5,6,7
.subscribe(s ->System.out.println(s));
- intervalRange
- start,count:同range
- initialDelay 傳送第一個值的延遲時間
- period 每兩個發射物的間隔時間
- unit,scheduler 額你懂的
Observable.intervalRange(5,100,3000,100,
TimeUnit.MILLISECONDS,Schedulers.io())
.subscribe(s ->System.out.println(s));
- interval:相當於intervalRange的start=0;
period 這個值一旦設定後是不可變化的
//period 以後的美每次間隔 這個值一旦設定後是不可變化的 所以 count方法無效的!
int[] s =newint[]{0};
Observable.interval(3000,100+ count(s),TimeUnit.MILLISECONDS,Schedulers.io())
.subscribe(s2 ->System.out.println(s2));
privateint count(int[] s){
int result = s[0]*1000;
s[0]= s[0]+1;
return result;
}
- defer 直到有觀察者訂閱時才建立Observable,並且為每個觀察者建立一個新的Observable
Observable.defer(()->Observable.just("Love","For","You!"))
.subscribe(s ->System.out.println(s));
- from系列
- fromArray
Integer[] items ={0,1,2,3,4,5};
Observable.fromArray(items).subscribe(
integer ->System.out.println(integer));
- fromCallable
Observable.fromCallable(()->Arrays.asList("hello","gaga"))
.subscribe(strings ->System.out.println(strings))
- fromIterable
Observable.fromIterable(Arrays.<String>asList("one","two","three"))
.subscribe(integer ->System.out.println(integer));
- fromFuture
Observable.fromFuture(Observable.just(1).toFuture())
.doOnComplete(()->System.out.println("complete"))
.subscribe();
- fromArray
過濾操作
- elementAt:只發射第N項資料
<!-- 無預設值版本 -->
Observable.just(1,2)
.elementAt(0)
.subscribe(o -> System.out.print(o ));//結果:1
<!-- 帶預設值的變體版本 -->
Observable.range(0, 10)
// 如果索引值大於資料 項數,它會發射一個預設值(通過額外的引數指定),而不是丟擲異常。
// 但是如果你傳遞一 個負數索引值,它仍然會丟擲一個 IndexOutOfBoundsException 異常。
.elementAt(100, -100)
.subscribe(o -> System.out.print(o + "t"));
-
IgnoreElements:如果你不關心一個Observable發射的資料,但是希望在它完成時或遇到錯誤終止時收到通知
Observable.range(0,10)
.ignoreElements()
.subscribe(()->System.out.println("complete")
, throwable ->System.out.println("throwable"));
-
take系列
- 變體 count系列:只發射前面的N項資料
Observable.range(0,10)
.take(3)
.subscribe(o ->System.out.print(o +"t"))
- 變體 time系列: 發射Observable開始的那段時間發射 的資料,
Observable.range(0,10)
.take(100,TimeUnit.MILLISECONDS)
.subscribe(o ->System.out.print(o +"t"));
-
takeLast
- 變體 count系列:只發射後面的N項資料
Observable.range(0,10)
.takeLast(3)
.subscribe(o ->System.out.print(o +"t"));
- 變體 time系列: 發射在原始Observable的生命周 期內最後一段時間內發射的資料
Observable.range(0,10)
.takeLast(100,TimeUnit.MILLISECONDS)
.subscribe(o ->System.out.print(o +"t"));
- takeUntil:傳送complete的結束條件 當然傳送結束之前也會包括這個值
Observable.just(2,3,4,5)
//傳送complete的結束條件 當然傳送結束之前也會包括這個值
.takeUntil(integer -> integer>3)
.subscribe(o ->System.out.print(o +"t"));//2,3,4
- takeWhile:當不滿足這個條件 會發送結束 不會包括這個值
Observable.just(2,3,4,5)
//當不滿足這個條件 會發送結束 不會包括這個值
.takeWhile(integer ->integer<=4)
.subscribe(o ->System.out.print(o +"t"));//2,3,4
-
skip系列
- 變體 count系列:丟棄Observable發射的前N項資料
Observable.range(0,5)
.skip(3)
.subscribe(o ->System.out.print(o +"t"));
- 變體 time系列:丟棄原始Observable開始的那段時間發 射的資料
Observable.range(0,5)
.skip(3)
.subscribe(o ->System.out.print(o +"t"));
- 變體 count系列:丟棄Observable發射的前N項資料
-
skipLast
- 變體 count系列:丟棄Observable發射的前N項資料
Observable.range(0,5)
.skipLast(3)
.subscribe(o ->System.out.print(o +"t"));
- 變體 time系列:丟棄在原始Observable的生命周 期內最後一段時間內發射的資料
Observable.range(0,10)
.skipLast(100,TimeUnit.MILLISECONDS)
.subscribe(o ->System.out.print(o +"t"));
-
distinct:去重
-
keySelector:這個函式根據原始Observable發射的資料項產生一個 Key,然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的
Observable.just(1,2,1,2,3)
//這個函式根據原始Observable發射的資料項產生一個 Key,
// 然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的
.distinct(integer ->Math.random())
.subscribe(o ->System.out.print(o +"t"));
日誌:
原因 key不同所以當做資料不同處理
12123
- 無參版本 就是內部實現了的keySelector通過生成的key就是value本身
Observable.just(1,2,1,2,3)
.distinct()
.subscribe(o ->System.out.print(o +"t"));
日誌:
123
-
-
distinctUntilChanged(相鄰去重):它只判定一個數據和它的直接前驅是 否是不同的。
其他概念與distinct一樣
- throttleWithTimeout/debounce:
操作符會過濾掉髮射速率過快的資料項 throttleWithTimeout/debounce: 含義相同 如果傳送資料後 指定時間段內沒有新資料的話 。則傳送這條 如果有新資料 則以這個新資料作為將要傳送的資料項,並且重置這個時間段,重新計時。
Observable.create(e ->{
e.onNext("onNext 0");
Thread.sleep(100);
e.onNext("onNext 1");
Thread.sleep(230);
e.onNext("onNext 2");
Thread.sleep(300);
e.onNext("onNext 3");
Thread.sleep(400);
e.onNext("onNext 4");
Thread.sleep(500);
e.onNext("onNext 5");
e.onNext("onNext 6");
})
.debounce(330,TimeUnit.MILLISECONDS)
// .throttleWithTimeout(330, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(o ->System.out.println(o));//結果 3 4 6
-
filter:只發射通過了謂詞測試的資料項
Observable.range(0,10)
//過濾掉false的元素
.filter(integer -> integer %2==0)
.subscribe(o ->System.out.print(o +"t"));
- ofType:ofType 是 filter 操作符的一個特殊形式。它過濾一個Observable只返回指定型別的資料
Observable.just(0,"what?",1,"String",3)
//ofType 是 filter 操作符的一個特殊形式。它過濾一個Observable只返回指定型別的資料。
.ofType(String.class)
.subscribe(o ->System.out.print(o +"t"));
- first:只發射第一項(或者滿足某個條件的第一項)數 感覺和take(1) elementAt(0)差不多
Observable.range(0,10)
//如果元資料沒有傳送 則有傳送預設值
.first(-1)
.subscribe(o ->System.out.print(o +"t"));
- last:只發射最後一項(或者滿足某個條件的最後一項)資料 感覺和takeLast(1)差不多
Observable.empty()
//如果元資料沒有傳送 則有傳送預設值
.last(-1)
.subscribe(o ->System.out.print(o +"t"));
- sample/throttleLast: 週期取樣後 傳送最後的資料
- throttleFirst:週期取樣 的第一條資料 傳送
注意: 如果是已經被髮送過的 則不會繼續傳送
Observable.create(e ->{
e.onNext("onNext 0");
Thread.sleep(100);
e.onNext("onNext 1");
Thread.sleep(50);
e.onNext("onNext 2");
Thread.sleep(70);
e.onNext("onNext 3");
Thread.sleep(200);
e.onNext("onNext 4");
e.onNext("onNext 5");
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
<!--結果: onNext 2 onNext 3 onNext 5-->
.sample(200,TimeUnit.MILLISECONDS,Schedulers.newThread())
<!--結果: onNext 2 onNext 3 onNext 5-->
// .throttleLast(200, TimeUnit.MILLISECONDS,Schedulers.newThread())
<!--結果: onNext 0 onNext 3 onNext 4-->
// .throttleFirst(200, TimeUnit.MILLISECONDS,Schedulers.newThread())
.subscribe(o ->System.out.print(o +"t"));
輔助操作
- repeat:不是建立一個Observable,而是重複發射原始,Observable的資料序列,這個序列或者是無限的,或者通過 repeat(n) 指定重複次數
Observable.just("Love","For","You!")
.repeat(3)//重複三次
.subscribe(s ->System.out.println(s));
- repeatUntil:getAsBoolean 如果返回 true則不repeat false則repeat.主要用於動態控制
Observable.just("Love","For","You!")
.repeatUntil(newBooleanSupplier(){
@Override
publicboolean getAsBoolean()throwsException{
System.out.println("getAsBoolean");
count++;
if(count ==3)
returntrue;
else
returnfalse;
}
}).subscribe(s ->System.out.println(s));
- delay:延遲一段指定的時間再發射來自Observable的發射物
注意: delay 不會平移 onError 通知,它會立即將這個通知傳遞給訂閱者,同時丟棄任何待 發射的 onNext 通知。 然而它會平移一個 onCompleted 通知
Observable.range(0,3)
.delay(1400,TimeUnit.MILLISECONDS)
.subscribe(o ->System.out.println("===>"+ o +"t"));
- delaySubscription:讓你你可以延遲訂閱原始Observable
Observable.just(1)
.delaySubscription(2000,TimeUnit.MILLISECONDS)
.subscribe(o ->System.out.println("===>"+ o +"t")
, throwable ->System.out.println("===>throwable")
,()->System.out.println("===>complete")
, disposable ->System.out.println("===>訂閱"));
-
do系列
- doOnEach:註冊一個回撥,它產生的Observable每發射一項資料就會呼叫它一次
Observable.range(0,3)
.doOnEach(integerNotification ->System.out.println(integerNotification.getValue()))
.subscribe(o ->System.out.print("===>"+ o +"t"));
日誌:
doOnEach:
doOnEach:0===>0
doOnEach:1===>1
doOnEach:2===>2
doOnEach:null
- doOnNext:注類似doOnEach 不是接受一個 Notification 引數,而是接受發射的資料項。
- doOnEach:註冊一個回撥,它產生的Observable每發射一項資料就會呼叫它一次