RxJS 核心概念之Subject
本文出處:https://segmentfault.com/a/1190000005069851,我做了一部分修改
我們在接觸到RxJS的時候,不免會有點暈頭轉向的感覺,對於什麼是Subject,什麼是Observer,什麼是Observable,總感覺暈乎乎的。下面我引用一篇為自認為比較通俗易懂的博文,再加上自己的描述來給大家解釋下,弄明白之後對於學習Angular2+有很大的幫助,因為在angular2+裡,在很多場景裡,RxJS把promise取代了。至於為什麼取代了。請大家檢視這篇文章的前半部分:http://blog.csdn.net/ac_hell/article/details/68069224。
什麼是Subject?
Subject是一種可以多路推送的可觀察物件。與EventEmitter類似,Subject維護著自己的Observer。
每一個Subject都是一個Observable(可觀察物件) 對於一個Subject,你可以訂閱(subscribe
)它,Observer會和往常一樣接收到資料。從Observer的視角看,它並不能區分自己的執行環境是普通Observable的單路推送還是基於Subject的多路推送。
Subject的內部實現中,並不會在被訂閱(subscribe
)後建立新的執行環境。它僅僅會把新的Observer註冊在由它本身維護的Observer列表中,這和其他語言、庫中的addListener
機制類似。
每一個Subject也可以作為Observer(觀察者) Subject同樣也是一個由next(v)
error(e)
,和 complete()
這些方法組成的物件。呼叫next(theValue)
方法後,Subject會向所有已經在其上註冊的Observer多路推送theValue
。
下面的例子中,我們在Subject上註冊了兩個Observer,並且多路推送了一些數值:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
控制檯輸出結果如下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
既然Subject是一個Observer,你可以把它作為subscribe
(訂閱)普通Observable時的引數,如下面例子所示:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // 你可以傳遞Subject來訂閱observable
執行後結果如下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
通過上面的實現:我們發現可以通過Subject將普通的Observable單路推送轉換為多路推送。這說明了Subject的作用——作為單路Observable轉變為多路Observable的橋樑。
還有幾種特殊的Subject
型別,分別是BehaviorSubject
,ReplaySubject
,和 AsyncSubject
。
多路推送的Observable
在以後的語境中,每當提到“多路推送的Observable”,我們特指通過Subject構建的Observable執行環境。否則“普通的Observable”只是一個不會共享執行環境並且被訂閱後才生效的一系列值。
通過使用Subject可以建立擁有相同執行環境的多路的Observable。
下面展示了多路
的運作方式:Subject從普通的Observable訂閱了資料,然後其他Observer又訂閱了這個Subject,示例如下:
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// 通過`subject.subscribe({...})`訂閱Subject的Observer:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// 讓Subject從資料來源訂閱開始生效:
multicasted.connect();
multicast
方法返回一個類似於Observable的可觀察物件,但是在其被訂閱後,它會表現Subject的特性。 multicast
返回的物件同時是ConnectableObservable
型別的,擁有connect()
方法。
connect()
方法非常的重要,它決定Observable何時開始執行。由於呼叫connect()
後,Observable開始執行,因此,connect()
會返回一個Subscription
供呼叫者來終止執行。
引用計數
通過手動呼叫connect()
返回的Subscription控制執行十分繁雜。通常,我們希望在有第一個Observer訂閱Subject後自動connnect
,當所有Observer都取消訂閱後終止這個Subject。
我們來分析一下下面例子中subscription的過程:
-
第一個Observer 訂閱了多路推送的 Observable
-
多路Observable被連線
-
向第一個Observer傳送 值為
0
的next
通知 -
第二個Observer訂閱了多路推送的 Observable
-
向第一個Observer傳送 值為
1
的next
通知 -
向第二個Observer傳送 值為
1
的next
通知 -
第一個Observer取消了對多路推送的Observable的訂閱
-
向第二個Observer傳送 值為
2
的next
通知 -
第二個Observer取消了對多路推送的Observable的訂閱
-
取消對多路推送的Observable的連線
通過顯式地呼叫connect()
,程式碼如下:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe();
}, 2000);
如果你不想顯式地呼叫connect()
方法,可以在ConnectableObservable型別的Observable上呼叫refCount()
方法。方法會進行引用計數:記錄Observable被訂閱的行為。當訂閱數從 0
到 1
時refCount()
會呼叫connect()
方法。到訂閱數從1
到 0
,他會終止整個執行過程。這裡要解釋一下,這個0,1這些數是怎麼來的,因為interval操作符返回一個在規定的毫秒間隔時間內產生增加數的Observable。通俗點來說,這裡就是每隔500ms就產生一個從0開始自增的數字。
refCount
使得多路推送的Observable在被訂閱後自動執行,在所有觀察者取消訂閱後,停止執行。
下面是示例:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
執行輸出結果如下:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
只有ConnectableObservables擁有refCount()
方法,呼叫後會返回一個Observable
而不是新的ConnectableObservable。
BehaviorSubject
BehaviorSubject
是Subject的一個衍生類,具有“最新的值”的概念。它總是儲存最近向資料消費者傳送的值,當一個Observer訂閱後,它會即刻從BehaviorSubject
收到“最新的值”。
BehaviorSubjects非常適於表示“隨時間推移的值”。舉一個形象的例子,Subject表示一個人的生日,而Behavior則表示一個人的歲數。(生日只是一天,一個人的歲數會保持到下一次生日之前。)
下面例子中,展示瞭如何用 0
初始化BehaviorSubject,當Observer訂閱它時,0
是第一個被推送的值。緊接著,在第二個Observer訂閱BehaviorSubject之前,它推送了2
,雖然訂閱在推送2
之後,但是第二個Observer仍然能接受到2
:
var subject = new Rx.BehaviorSubject(0 /* 初始值 */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
輸出結果如下:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject
ReplaySubject
如同於BehaviorSubject
是 Subject
的子類。通過 ReplaySubject
可以向新的訂閱者推送舊數值,就像一個錄影機ReplaySubject
可以記錄Observable的一部分狀態(過去時間內推送的值)。
.一個ReplaySubject
可以記錄Observable執行過程中推送的多個值,並向新的訂閱者回放它們。
你可以指定回放值的數量:
var subject = new Rx.ReplaySubject(3 /* 回放數量 */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
輸出如下:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
除了回放數量,你也可以以毫秒為單位去指定“視窗時間”,決定ReplaySubject記錄多久以前Observable推送的數值。下面的例子中,我們把回放數量設定為100
,把視窗時間設定為500
毫秒:
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
var i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
第二個Observer接受到3
(600ms), 4
(800ms)
和 5
(1000ms),這些值均在訂閱之前的500
毫秒內推送(視窗長度
1000ms - 600ms = 400ms < 500ms):
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
AsyncSubject
AsyncSubject是Subject的另外一個衍生類,Observable僅會在執行完成後,推送執行環境中的最後一個值。
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
輸出結果如下:
observerA: 5
observerB: 5
AsyncSubject 與 last()
操作符相似,等待完成通知後推送執行過程的最後一個值。