RxJava2(三)Subject和Processor
阿新 • • 發佈:2018-11-11
一, Subject
它既是一個Observable可不斷呼叫onNext發射資料,直到遇到onComplete結束,又是一個Observer可訂閱資料。它可以將訂閱的資料作為自己的資料發射出去。
包含4種類型:AsyncSubject,BehaviorSubject,ReplaySubject,PublishSubject。
AsyncSubject
觀察者/訂閱者只會接受到onComplete之前的最後一個數據。
AsyncSubject<Integer> subject = AsyncSubject.create();
subject. onNext(0);
subject.onNext(1);
subject.onNext(2);
subject.onComplete();
subject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
Toast.makeText(MainActivity.this, integer, Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete" );
}
});
}
執行結果:onNext: 2
BehaviorSubject
觀察者/訂閱者會收到訂閱之前的最後一個數據,再繼續接受之後發射過來的資料,若BehaviorSubject訂閱之前未發射過資料,則發射一個預設值。
BehaviorSubject<Integer> subject = BehaviorSubject.createDefault(9);
subject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: onComplete");
}
});
subject.onNext(0);
subject.onNext(1);
subject.onComplete();
輸出結果:
accept:9
accept:0
accept:1
run: onComplete
ReplaySubject
1.無論何時訂閱,都將發射所有的原始資料給訂閱者。
ReplaySubject subject = ReplaySubject.create();
2.快取n條資料,當訂閱時只發送快取過的資料和之後資料。
ReplaySubject subject = ReplaySubject.createWithSize(2);
subject.onNext(0);
subject.onNext(1);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
subject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: onComplete");
}
});
subject.onNext(6);
subject.onNext(7);
subject.onComplete();
輸出結果:
accept: 4
accept: 5
accept: 6
accept: 7
run: onComplete
PublishSubject
觀察者只接受PublishSubject訂閱之後的資料。
PublishSubject subject = PublishSubject.create();
subject.onNext(0);
subject.onNext(1);
subject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: onComplete");
}
});
subject.onNext(3);
subject.onNext(4);
輸出結果:
accept: 3
accept: 4
二,Processor
Processor和Subject用法一樣,只是Processor支援被壓。
它也包含4中型別:AsyncProcessor, BehaviorProcessor,ReplayProcessor,PublishProcessor。
用法同Subject一樣。