1. 程式人生 > >RxJava2(三)Subject和Processor

RxJava2(三)Subject和Processor

一, Subject

它既是一個Observable可不斷呼叫onNext發射資料,直到遇到onComplete結束,又是一個Observer可訂閱資料。它可以將訂閱的資料作為自己的資料發射出去。

包含4種類型:AsyncSubject,BehaviorSubject,ReplaySubject,PublishSubject。

AsyncSubject

觀察者/訂閱者只會接受到onComplete之前的最後一個數據。

AsyncSubject<Integer> subject = AsyncSubject.create();
        subject.
onNext(0); subject.onNext(1); subject.onNext(2); subject.onComplete(); subject.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer)
{ Log.d(TAG, "onNext: " + integer); Toast.makeText(MainActivity.this, integer, Toast.LENGTH_SHORT).show(); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.getMessage()); } @Override
public void onComplete() { Log.d(TAG, "onComplete" ); } }); }

執行結果:onNext: 2

BehaviorSubject

觀察者/訂閱者會收到訂閱之前的最後一個數據,再繼續接受之後發射過來的資料,若BehaviorSubject訂閱之前未發射過資料,則發射一個預設值。

  BehaviorSubject<Integer> subject = BehaviorSubject.createDefault(9);

        subject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {

            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "run: onComplete");
            }
        });
        subject.onNext(0);
        subject.onNext(1);
        subject.onComplete();

輸出結果:
accept:9
accept:0
accept:1
run: onComplete

ReplaySubject

1.無論何時訂閱,都將發射所有的原始資料給訂閱者。

ReplaySubject subject = ReplaySubject.create();

2.快取n條資料,當訂閱時只發送快取過的資料和之後資料。

 ReplaySubject subject = ReplaySubject.createWithSize(2);
        subject.onNext(0);
        subject.onNext(1);
        subject.onNext(3);
        subject.onNext(4);
        subject.onNext(5);

        subject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {

            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "run: onComplete");
            }
        });
        subject.onNext(6);
        subject.onNext(7);
        subject.onComplete();

輸出結果:
accept: 4
accept: 5
accept: 6
accept: 7
run: onComplete

PublishSubject

觀察者只接受PublishSubject訂閱之後的資料。

PublishSubject subject = PublishSubject.create();
        subject.onNext(0);
        subject.onNext(1);

        subject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {

            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "run: onComplete");
            }
        });
        subject.onNext(3);
        subject.onNext(4);

輸出結果:
accept: 3
accept: 4

二,Processor

Processor和Subject用法一樣,只是Processor支援被壓。
它也包含4中型別:AsyncProcessor, BehaviorProcessor,ReplayProcessor,PublishProcessor。
用法同Subject一樣。