1. 程式人生 > >Rxjava2的學習與總結

Rxjava2的學習與總結

Rxjava2基礎認知

  • 形式正確的有限Observable 呼叫觀察者的onCompleted正好一次或者它的onError正好一次,而且此後不能再呼叫觀察者的任何其它方法。如果onComplete 或者 onError 走任何一個 都會 主動解除訂閱關係;

    • 如果解除訂閱關係以後在發射 onError 則會 報錯;而發射onComplete則不會。
    • 注意解除訂閱關係 還是可以發射 onNext
  • Disposable類:

    • dispose():主動解除訂閱
    • isDisposed():查詢是否解除訂閱 true 代表 已經解除訂閱
  • CompositeDisposable類:可以快速解除所有新增的Disposable類 每當我們得到一個Disposable時就呼叫CompositeDisposable.add()將它新增到容器中, 在退出的時候, 呼叫CompositeDisposable.clear() 即可快速解除.
  1. CompositeDisposable compositeDisposable=newCompositeDisposable();
  2. Observable.create(newObservableOnSubscribe<Integer>(){
  3. @Override
  4. publicvoid subscribe(ObservableEmitter<Integer> emitter)throwsException{
  5. emitter.onNext(1);
  6. emitter.onComplete();或者 emitter.onError
    (newThrowable("O__O "));
  7. }
  8. }).subscribe(newObserver<Integer>(){
  9. privateDisposable mDisposable;
  10. @Override
  11. publicvoid onSubscribe(Disposable d){
  12. <!--訂閱-->
  13. mDisposable = d;
  14. <!--新增到容器中-->
  15. compositeDisposable.add(d);
  16. }
  17. @Override
  18. publicvoid onNext(Integer value){
  19. <!--
    判斷mDisposable.isDisposed()如果解除了則不需要處理-->
  20. }
  21. @Override
  22. publicvoid onError(Throwable e){
  23. }
  24. @Override
  25. publicvoid onComplete(){
  26. }
  27. });
  28. <!--解除所有訂閱者-->
  29. compositeDisposable.clear();

基礎概念

  • Scheduler scheduler

timer() alt+點選timer可檢視 關於timer的方法 可以看到時候有這個引數的變體!

  • Callable bufferSupplier:自定義裝載的容器
  1. Observable.range(1,10)
  2. //() -> new ArrayList<>() 則是bufferSupplier
  3. .buffer(2,1,()->newArrayList<>())
  4. .subscribe(integers ->System.out.println(integers));

建立操作

  • create : 建立一個具有發射能力的Observable
  1. Observable.create(e ->{
  2. e.onNext("Love");
  3. e.onNext("For");
  4. e.onNext("You!");
  5. e.onComplete();
  6. }).subscribe(s ->System.out.println(s));
  • just:只是簡單的原樣發射,可將陣列或Iterable當做單個數據。它接受一至九個引數
  1. Observable.just("Love","For","You!")
  2. .subscribe(s ->System.out.println(s));
  • empty:建立一個不發射任何資料但是正常終止的Observable
  • never:建立一個不發射資料也不終止的Observable
  • error:建立一個不發射資料以一個錯誤終止的Observable
  1. Observable.empty();
  2. Observable.never();
  3. Observable.error(newThrowable("O__O"))
  • timer 在延遲一段給定的時間後發射一個簡單的數字0
  1. Observable.timer(1000,TimeUnit.MILLISECONDS)
  2. .subscribe(s ->System.out.println(s));
  • range:
    • start:起始值
    • count:一個是範 圍的資料的數目。0不傳送 ,負數 異常
  1. Observable.range(5,3)
  2. //輸出 5,6,7
  3. .subscribe(s ->System.out.println(s));
  • intervalRange
    • start,count:同range
    • initialDelay 傳送第一個值的延遲時間
    • period 每兩個發射物的間隔時間
    • unit,scheduler 額你懂的
  1. Observable.intervalRange(5,100,3000,100,
  2. TimeUnit.MILLISECONDS,Schedulers.io())
  3. .subscribe(s ->System.out.println(s));
  • interval:相當於intervalRange的start=0;

    period 這個值一旦設定後是不可變化的

  1. //period 以後的美每次間隔 這個值一旦設定後是不可變化的 所以 count方法無效的!
  2. int[] s =newint[]{0};
  3. Observable.interval(3000,100+ count(s),TimeUnit.MILLISECONDS,Schedulers.io())
  4. .subscribe(s2 ->System.out.println(s2));
  5. privateint count(int[] s){
  6. int result = s[0]*1000;
  7. s[0]= s[0]+1;
  8. return result;
  9. }
  • defer 直到有觀察者訂閱時才建立Observable,並且為每個觀察者建立一個新的Observable
  1. Observable.defer(()->Observable.just("Love","For","You!"))
  2. .subscribe(s ->System.out.println(s));
  • from系列
    • fromArray
      1. Integer[] items ={0,1,2,3,4,5};
      2. Observable.fromArray(items).subscribe(
      3. integer ->System.out.println(integer));
    • fromCallable
      1. Observable.fromCallable(()->Arrays.asList("hello","gaga"))
      2. .subscribe(strings ->System.out.println(strings))
    • fromIterable
      1. Observable.fromIterable(Arrays.<String>asList("one","two","three"))
      2. .subscribe(integer ->System.out.println(integer));
    • fromFuture
      1. Observable.fromFuture(Observable.just(1).toFuture())
      2. .doOnComplete(()->System.out.println("complete"))
      3. .subscribe();

過濾操作

  • elementAt:只發射第N項資料
  1. <!-- 無預設值版本 -->
  2. Observable.just(1,2)
  3. .elementAt(0)
  4. .subscribe(o -> System.out.print(o ));//結果:1
  5. <!-- 帶預設值的變體版本 -->
  6. Observable.range(0, 10)
  7. // 如果索引值大於資料 項數,它會發射一個預設值(通過額外的引數指定),而不是丟擲異常。
  8. // 但是如果你傳遞一 個負數索引值,它仍然會丟擲一個 IndexOutOfBoundsException 異常。
  9. .elementAt(100, -100)
  10. .subscribe(o -> System.out.print(o + "t"));
  • IgnoreElements:如果你不關心一個Observable發射的資料,但是希望在它完成時或遇到錯誤終止時收到通知

    1. Observable.range(0,10)
    2. .ignoreElements()
    3. .subscribe(()->System.out.println("complete")
    4. , throwable ->System.out.println("throwable"));
  • take系列

    • 變體 count系列:只發射前面的N項資料
    1. Observable.range(0,10)
    2. .take(3)
    3. .subscribe(o ->System.out.print(o +"t"))
    • 變體 time系列: 發射Observable開始的那段時間發射 的資料,
    1. Observable.range(0,10)
    2. .take(100,TimeUnit.MILLISECONDS)
    3. .subscribe(o ->System.out.print(o +"t"));
  • takeLast

    • 變體 count系列:只發射後面的N項資料
    1. Observable.range(0,10)
    2. .takeLast(3)
    3. .subscribe(o ->System.out.print(o +"t"));
    • 變體 time系列: 發射在原始Observable的生命周 期內最後一段時間內發射的資料
    1. Observable.range(0,10)
    2. .takeLast(100,TimeUnit.MILLISECONDS)
    3. .subscribe(o ->System.out.print(o +"t"));
  • takeUntil:傳送complete的結束條件 當然傳送結束之前也會包括這個值
  1. Observable.just(2,3,4,5)
  2. //傳送complete的結束條件 當然傳送結束之前也會包括這個值
  3. .takeUntil(integer -> integer>3)
  4. .subscribe(o ->System.out.print(o +"t"));//2,3,4
  • takeWhile:當不滿足這個條件 會發送結束 不會包括這個值
  1. Observable.just(2,3,4,5)
  2. //當不滿足這個條件 會發送結束 不會包括這個值
  3. .takeWhile(integer ->integer<=4)
  4. .subscribe(o ->System.out.print(o +"t"));//2,3,4
  • skip系列

    • 變體 count系列:丟棄Observable發射的前N項資料
      1. Observable.range(0,5)
      2. .skip(3)
      3. .subscribe(o ->System.out.print(o +"t"));
    • 變體 time系列:丟棄原始Observable開始的那段時間發 射的資料
      1. Observable.range(0,5)
      2. .skip(3)
      3. .subscribe(o ->System.out.print(o +"t"));
  • skipLast

    • 變體 count系列:丟棄Observable發射的前N項資料
    1. Observable.range(0,5)
    2. .skipLast(3)
    3. .subscribe(o ->System.out.print(o +"t"));
    • 變體 time系列:丟棄在原始Observable的生命周 期內最後一段時間內發射的資料
      1. Observable.range(0,10)
      2. .skipLast(100,TimeUnit.MILLISECONDS)
      3. .subscribe(o ->System.out.print(o +"t"));
  • distinct:去重

    • keySelector:這個函式根據原始Observable發射的資料項產生一個 Key,然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的

      1. Observable.just(1,2,1,2,3)
      2. //這個函式根據原始Observable發射的資料項產生一個 Key,
      3. // 然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的
      4. .distinct(integer ->Math.random())
      5. .subscribe(o ->System.out.print(o +"t"));
      6. 日誌:
      7. 原因 key不同所以當做資料不同處理
      8. 12123
    • 無參版本 就是內部實現了的keySelector通過生成的key就是value本身
    1. Observable.just(1,2,1,2,3)
    2. .distinct()
    3. .subscribe(o ->System.out.print(o +"t"));
    4. 日誌:
    5. 123
  • distinctUntilChanged(相鄰去重):它只判定一個數據和它的直接前驅是 否是不同的。

    其他概念與distinct一樣

  • throttleWithTimeout/debounce:

操作符會過濾掉髮射速率過快的資料項 throttleWithTimeout/debounce: 含義相同 如果傳送資料後 指定時間段內沒有新資料的話 。則傳送這條 如果有新資料 則以這個新資料作為將要傳送的資料項,並且重置這個時間段,重新計時。

  1. Observable.create(e ->{
  2. e.onNext("onNext 0");
  3. Thread.sleep(100);
  4. e.onNext("onNext 1");
  5. Thread.sleep(230);
  6. e.onNext("onNext 2");
  7. Thread.sleep(300);
  8. e.onNext("onNext 3");
  9. Thread.sleep(400);
  10. e.onNext("onNext 4");
  11. Thread.sleep(500);
  12. e.onNext("onNext 5");
  13. e.onNext("onNext 6");
  14. })
  15. .debounce(330,TimeUnit.MILLISECONDS)
  16. // .throttleWithTimeout(330, TimeUnit.MILLISECONDS)
  17. .subscribeOn(Schedulers.newThread())
  18. .observeOn(Schedulers.newThread())
  19. .subscribe(o ->System.out.println(o));//結果 3 4 6
  • filter:只發射通過了謂詞測試的資料項

    1. Observable.range(0,10)
    2. //過濾掉false的元素
    3. .filter(integer -> integer %2==0)
    4. .subscribe(o ->System.out.print(o +"t"));
  • ofType:ofType 是 filter 操作符的一個特殊形式。它過濾一個Observable只返回指定型別的資料
  1. Observable.just(0,"what?",1,"String",3)
  2. //ofType 是 filter 操作符的一個特殊形式。它過濾一個Observable只返回指定型別的資料。
  3. .ofType(String.class)
  4. .subscribe(o ->System.out.print(o +"t"));
  • first:只發射第一項(或者滿足某個條件的第一項)數 感覺和take(1) elementAt(0)差不多
  1. Observable.range(0,10)
  2. //如果元資料沒有傳送 則有傳送預設值
  3. .first(-1)
  4. .subscribe(o ->System.out.print(o +"t"));
  • last:只發射最後一項(或者滿足某個條件的最後一項)資料 感覺和takeLast(1)差不多
  1. Observable.empty()
  2. //如果元資料沒有傳送 則有傳送預設值
  3. .last(-1)
  4. .subscribe(o ->System.out.print(o +"t"));
  • sample/throttleLast: 週期取樣後 傳送最後的資料
  • throttleFirst:週期取樣 的第一條資料 傳送

    注意: 如果是已經被髮送過的 則不會繼續傳送

  1. Observable.create(e ->{
  2. e.onNext("onNext 0");
  3. Thread.sleep(100);
  4. e.onNext("onNext 1");
  5. Thread.sleep(50);
  6. e.onNext("onNext 2");
  7. Thread.sleep(70);
  8. e.onNext("onNext 3");
  9. Thread.sleep(200);
  10. e.onNext("onNext 4");
  11. e.onNext("onNext 5");
  12. })
  13. .subscribeOn(Schedulers.newThread())
  14. .observeOn(Schedulers.newThread())
  15. <!--結果: onNext 2 onNext 3 onNext 5-->
  16. .sample(200,TimeUnit.MILLISECONDS,Schedulers.newThread())
  17. <!--結果: onNext 2 onNext 3 onNext 5-->
  18. // .throttleLast(200, TimeUnit.MILLISECONDS,Schedulers.newThread())
  19. <!--結果: onNext 0 onNext 3 onNext 4-->
  20. // .throttleFirst(200, TimeUnit.MILLISECONDS,Schedulers.newThread())
  21. .subscribe(o ->System.out.print(o +"t"));

輔助操作

  • repeat:不是建立一個Observable,而是重複發射原始,Observable的資料序列,這個序列或者是無限的,或者通過 repeat(n) 指定重複次數
  1. Observable.just("Love","For","You!")
  2. .repeat(3)//重複三次
  3. .subscribe(s ->System.out.println(s));
  • repeatUntil:getAsBoolean 如果返回 true則不repeat false則repeat.主要用於動態控制
  1. Observable.just("Love","For","You!")
  2. .repeatUntil(newBooleanSupplier(){
  3. @Override
  4. publicboolean getAsBoolean()throwsException{
  5. System.out.println("getAsBoolean");
  6. count++;
  7. if(count ==3)
  8. returntrue;
  9. else
  10. returnfalse;
  11. }
  12. }).subscribe(s ->System.out.println(s));
  • delay:延遲一段指定的時間再發射來自Observable的發射物

    注意: delay 不會平移 onError 通知,它會立即將這個通知傳遞給訂閱者,同時丟棄任何待 發射的 onNext 通知。 然而它會平移一個 onCompleted 通知

  1. Observable.range(0,3)
  2. .delay(1400,TimeUnit.MILLISECONDS)
  3. .subscribe(o ->System.out.println("===>"+ o +"t"));
  • delaySubscription:讓你你可以延遲訂閱原始Observable
  1. Observable.just(1)
  2. .delaySubscription(2000,TimeUnit.MILLISECONDS)
  3. .subscribe(o ->System.out.println("===>"+ o +"t")
  4. , throwable ->System.out.println("===>throwable")
  5. ,()->System.out.println("===>complete")
  6. , disposable ->System.out.println("===>訂閱"));
  • do系列

    • doOnEach:註冊一個回撥,它產生的Observable每發射一項資料就會呼叫它一次
      1. Observable.range(0,3)
      2. .doOnEach(integerNotification ->System.out.println(integerNotification.getValue()))
      3. .subscribe(o ->System.out.print("===>"+ o +"t"));
      4. 日誌:
      5. doOnEach:
      6. doOnEach:0===>0
      7. doOnEach:1===>1
      8. doOnEach:2===>2
      9. doOnEach:null
    • doOnNext:注類似doOnEach 不是接受一個 Notification 引數,而是接受發射的資料項。