rxjs學習入門心得(一)Observable可觀察物件
前言
隨著開發中專案的越來越大,程式碼的要求越來越高,於是開始四處搜找各種js庫進行學習。為了學習程式碼行為模式,例如:競爭等等。在技術總監的指引下找到Rxjs進行學習,再次表以感謝。在看教程時,有很多地方不解,於是用部落格做以記錄,並將自己的經驗以儘可能簡單的方式分享給大家。
這裡簡單解釋一下Rxjs,RxJS 是一個js庫,它通過使用 observable 序列來編寫非同步和基於事件的程式。ReactiveX 結合了 觀察者模式、迭代器模式 和 使用集合的函數語言程式設計,以滿足以一種理想方式來管理事件序列所需要的一切。看到這你肯定疑問它有什麼用?你先放下這個疑問,先看看一個簡單的案例。
Observable可觀察物件
Observable可觀察物件:表示一個可呼叫的未來值或者事件的集合。
一個例子
通常你這樣註冊事件監聽:
var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));
使用RxJS建立一個可觀察物件:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click' )
.subscribe(() => console.log('Clicked!'));
看到這段程式碼你可能迷茫,這是什麼意思?難道只是換了一種寫法?
觀察者模式
要說Observable可觀察物件首先得說說:觀察者模式
。
觀察者模式
又叫釋出-訂閱(Publish/Subscribe)模式
:
他定義了一種一對多的依賴關係,讓多個觀察者物件同時監聽某一個主題物件(也可叫做抽象的通知者)。這個主題物件在狀態發生變化時,會通知所有觀察者物件,使他們能夠自動更新自己。而且各個觀察者之間相互獨立。
觀察者模式的結構中包含四種角色:
(1)抽象主題(Subject):主題是一個介面,該介面規定了具體主題需要實現的方法,比如,新增、刪除觀察者以及通知觀察者更新資料的方法。
(2)抽象觀察者(Observer):觀察者是一個介面,該介面規定了具體觀察者用來更新資料的方法。
(3)具體主題(ConcreteSubject):具體主題是實現主題介面類的一個例項,該例項包含有可以經常發生變化的資料。具體主題需使用一個集合,比如ArrayList,存放觀察者的引用,以便資料變化時通知具體觀察者。
(4)具體觀察者(ConcreteObserver):具體觀察者是實現觀察者介面類的一個例項。具體觀察者包含有可以存放具體主題引用的主題介面變數,以便具體觀察者讓具體主題將自己的引用新增到具體主題的集合中,使自己成為它的觀察者,或讓這個具體主題將自己從具體主題的集合中刪除,使自己不再是它的觀察者。
觀察者模式結構的類圖如下所示:
在現實生活中,我們經常用它來“放風”,比如:上自習時,老師不在我們在玩,派出一個同學看老師,老師來了通知大家;如果該同學沒有發現老師,老師“咳咳”兩聲通知大家自己來了讓大家安靜自習,然後批評一番。
這裡的監聽的抽象主題物件是“老師是否來了”,同學們是觀察者,同學們依賴主題物件的狀態並且是一種一對多的依賴關係,同學們同時監聽主題物件的反饋結果,同學們訂閱(觀察)這個主題,在這個主題發生變化時,來更新自己:
老師來了->安靜自習,寫作業。
老師沒來->該玩玩,該吃吃。
放風的同學發現老師會通知,放風的同學沒發現,老師進入教室老師自己也會“咳咳”兩聲通知。因此“老師是否來了”這個抽象主題中,老師通知和放風的同學通知都是這個抽象主題物件的具體實現,這個抽象主題物件就是可觀察物件(可以觀察嘛~~~),這時再想想前面對於可觀察物件的定義(可呼叫的未來值或者事件的集合)是不是明白了?
同樣上面的RxJS的程式碼也是這種效應,Rx.Observable.fromEvent(button, 'click')
是一個建立一個點選事件的可觀察物件,然後使用subscribe(() => console.log('Clicked!'));
訂閱這個可觀察物件,其中() => console.log('Clicked!')
是一個觀察者,如果發生點選事件主題物件的狀態會發生改變,而他則會被執行。
這樣有什麼好處呢?
- 我們只需要針對可觀察物件這一抽象的主題物件介面程式設計,減少了與具體的耦合,即他只是一個抽象的通知者。
- 觀察者只依賴主題物件的狀態,這意味著維持各個觀察者的一致性,但又保證了各個觀察者是相互獨立的。
釋出-訂閱
Observables(可觀察物件)
以惰性的方式推送多值的集合。
示例 - 當訂閱下面程式碼中的 Observable 的時候會立即(同步地)推送值1、2、3,然後1秒後會推送值4,再然後是完成流(即完成推送):
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
要呼叫 Observable 並看到這些值,我們需要訂閱 Observable:
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
結果如下:
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
拉取 (Pull) vs. 推送 (Push)
拉取和推送是兩種不同的協議,用來描述資料生產者 (Producer)
如何與資料消費者 (Consumer)
如何進行通訊的。
什麼是拉取? - 在拉取體系中,資料的消費者
決定何時從資料生產者
那裡獲取資料,而資料生產者
自身並不會意識到什麼時候資料將會被髮送給資料消費者
。
每個 JavaScript 函式都是拉取體系。函式是資料的生產者
,呼叫該函式的程式碼通過從函式呼叫中“取出”一個單個返回值來對該函式進行消費(return 語句)。
ES2015 引入了生成器generator 函式
和 迭代器iterators (function*)
,這是另外一種型別的拉取體系。呼叫iterator.next()
的程式碼是消費者,它會從 iterator(生產者) 那“取出”多個值。
什麼是推送? -在推體系中,資料的生產者決定何時傳送資料給消費者,消費者不會在接收資料之前意識到它將要接收這個資料。
Promise(承諾)是當今JS中最常見的Push推體系,一個Promise(資料的生產者)傳送一個resolved value
(成功狀態的值)來註冊一個回撥(資料消費者),但是不同於函式的地方的是:Promise決定著何時資料才被推送至這個回撥函式。
RxJS引入了Observables(可觀察物件),一個新的 JavaScript 推送體系。一個可觀察物件是一個產生多值的生產者,並將值“推送”給觀察者(消費者)。
Function 是惰性的評估運算,呼叫時會同步地返回一個單一值。
Generator(生成器):是惰性的評估運算,在迭代時同步的返回零到無限多個值(如果有可能的話)
Promise 是最終可能(或可能不)返回單個值的運算。
Observable 是惰性的評估運算,它可以從它被呼叫的時刻起同步或非同步地返回零到(有可能的)無限多個值。
producer(生產者) | consumer (消費者) | 單個值 | 多個值 |
---|---|---|---|
pull拉 | Passive(被動的一方):被請求的時候產生資料 | Active(起主導的一方):決定何時請求資料 | Function |
push推 | Active:按自己的節奏生產資料 | Passive:對接收的資料做出反應(處理接收到的資料) | Promise |
可觀察物件(Observables):作為函式的泛化
與常見的主張相悖的是,可觀察物件不像EventEmitters(事件驅動),也不象Promises因為它可以返回多個值。可觀察物件可能會在某些情況下有點像EventEmitters(事件驅動),也即是當它們使用Subjects被多播時,但是大多數情況下,並不像EventEmitters。
可觀察物件(Observables)像是沒有引數, 但可以泛化為允許返回多個值的函式。
思考下面的程式
function foo() {
console.log('Hello');
return 42;
}
var x = foo.call(); // same as foo()
console.log(x); // "Hello" 42
var y = foo.call(); // same as foo()
console.log(y); // "Hello" 42
使用Observables得到同樣的結果
var foo=Rx.Observable.create(function(observer){
console.log('Hello');
observer.next(42);
});
foo.subscribe(function(x){
console.log(x);
});
foo.subscribe(function (y){
console.log(y);
});
得到同樣的輸出
"Hello" 42 "Hello" 42
這是因為函式和可觀察物件均是惰性計算。
如果你不呼叫call()
函式,console.log('Hello')
將不會發生。可觀察物件同樣如此,如果你不呼叫subscribe()
函式訂閱,console.log(‘Hello’)也將不會發生。
此外,call()
或者subscribe()
是一個獨立的操作:兩次call()函式呼叫觸發兩個獨立副作用,兩次subscribe()訂閱觸發兩個獨立的副作用。相反的,EventEmitters(事件驅動)共享副作用並且無論是否存在訂閱者都會盡早執行,Observables 與之相反,不會共享副作用並且是延遲執行。
訂閱一個可觀察物件類似於呼叫一個函式。
一些人認為可觀察物件是非同步的。這並不確切,如果你用一些log語句包圍在訂閱程式的前後:
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
輸出如下:
"before"
"Hello"
42
"after"
以上可以顯示對foo的訂閱是完全同步的,就像呼叫一個函式。
可觀察物件以同步或者非同步的方式傳送多個值。
那它和普通函式有哪些不同之處呢?
可觀察物件可以隨時間”return”多個值。然而函式卻做不到,你不能夠使得如下的情況發生:
function foo() {
console.log('Hello');
return 42;
return 100; // dead code. will never happen
}
函式僅僅可以返回一個值,然而,不要驚訝,可觀察物件卻可以做到這些:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100); // "return" another value
observer.next(200); // "return" yet another
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
同步輸出:
"before"
"Hello"
42
100
200
"after"
當然,你也可以以非同步的方式返回值:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // happens asynchronously
}, 1000);
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
同步輸出:
"before"
"Hello"
42
100
200
"after"
300
總結:
- fun.call()意味著“同步地給我一個值”
- observable.subscribe()意味著“給我任意多個值,同步也好非同步也罷。”
Observable 剖析
Observables(可觀察物件) 是使用 Rx.Observable.create 或建立操作符建立的,並使用觀察者來訂閱它,然後執行它併發送 next / error / complete 通知給觀察者,而且執行可能會被清理。這四個方面全部編碼進 Observables 例項中,但某些方面是與其他型別相關的,像 Observer (觀察者) 和 Subscription (訂閱)。
Observable 的核心關注點:
-建立 Observables(可觀察物件)
-訂閱 Observables(可觀察物件)
-執行 Observables(可觀察物件)
-清理 Observables(可觀察物件)
建立 Observables(可觀察物件)
Rx.Observable.create
是 Observable
建構函式的別名,它接收一個引數:subscribe
函式。
下面的示例建立了一個 Observable(可觀察物件),它每隔一秒會向觀察者傳送字串 ‘hi’ 。
var observable = Rx.Observable.create(function subscribe(observer) { // 通常我們會像之前的案例一樣,省略subscribe這個名字
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});
Observables 可以使用 create 來建立, 但通常我們使用所謂的建立操作符, 像 of、from、interval、等等。
在上面的示例中,subscribe 函式是用來描述 Observable 最重要的一塊。我們來看下訂閱是什麼意思。
訂閱 Observables
示例中的 Observable 物件建立的 observable 可以訂閱,像這樣:
observable.subscribe(x => console.log(x));
observable.subscribe
和 Observable.create(function subscribe(observer) {...})
中的 subscribe
有著同樣的名字,這並不是一個巧合。在Rx
庫中,它們是不同的。但從實際出發,你可以認為在概念上它們是等同的。
subscribe 呼叫
在同一 Observable(可觀察物件)
的多個觀察者之間是不共享的。當使用一個觀察者呼叫 observable.subscribe 時,Observable.create(function subscribe(observer) {…}) 中的 subscribe 函式只服務於給定的觀察者。對 observable.subscribe 的每次呼叫都會觸發針對給定觀察者的獨立設定。
訂閱 Observable 像是呼叫函式, 並提供接收資料的回撥函式。
這與像 addEventListener / removeEventListener 這樣的事件處理方法 API 是完全不同的。使用 observable.subscribe,在 Observable 中不會將給定的觀察者註冊為監聽器。Observable 甚至不會去維護一個附加的觀察者列表。
subscribe 呼叫是啟動 “Observable 執行”的一種簡單方式, 並將值或事件傳遞給本次執行的觀察者。
整體性案例:
程式先同步執行,過1s之後執行非同步,之後點選按鈕執行事件。
var observable = Rx.Observable.create(function subscribe (observer) { // 建立Observable
console.log('start-----------')
observer.next(42) // 同步執行
observer.next(100)
observer.next(200)
setTimeout(() => { // 非同步執行
observer.next(300)
}, 1000)
var button = document.getElementById('rx-eventListener')
button.addEventListener('click', () => { // 不知何時執行
console.log('Clicked!')
observer.next('Clicked-end')
})
console.log('end-------------')
})
observable.subscribe(x => { // 訂閱Observable
console.log('觀察者1')
console.log(x)
})
observable.subscribe(x => { // 訂閱Observable
console.log('觀察者2')
console.log(x)
})
結果如下:
這裡我們可以看到觀察者1
和觀察者2
雖然從釋出者那裡拿到的值是一樣的,但是每個觀察者都是相互獨立的。
執行 Observables
Observable.create(function subscribe(observer) {...})
中...
的程式碼表示 “Observable 執行”,它是惰性運算,只有在每個觀察者訂閱後才會執行。隨著時間的推移,執行會以同步或非同步的方式產生多個值。
Observable 執行可以傳遞三種類型的值:
-“Next” 通知: 傳送一個值,比如數字、字串、物件,等等。
-“Error” 通知: 傳送一個 JavaScript 錯誤 或 異常。
-“Complete” 通知: 不再發送任何值。
“Next” 通知是最重要,也是最常見的型別:它們表示傳遞給觀察者的實際資料。”Error” 和 “Complete” 通知可能只會在 Observable 執行期間發生一次,並且只會執行其中的一個。
這些約束用所謂的 Observable 語法或合約表達最好,寫為正則表示式是這樣的:
next*(error|complete)?
在 Observable 執行中, 可能會發送零個到無窮多個 “Next” 通知。如果傳送的是 “Error” 或 “Complete” 通知的話,那麼之後不會再發送任何通知了。
下面是 Observable 執行的示例,它傳送了三個 “Next” 通知,然後是 “Complete” 通知:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
Observable 嚴格遵守自身的規約,所以下面的程式碼不會發送 “Next” 通知 4:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); // 因為違反規約,所以不會發送
});
在 subscribe 中用 try/catch 程式碼塊來包裹任意程式碼是個不錯的主意,如果捕獲到異常的話,會發送 “Error” 通知:
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // 如果捕獲到異常會發送一個錯誤
}
});
整體性案例:
var observable = Rx.Observable.create(function subscribe (observer) {
try {
observer.next(1)
observer.next(2)
observer.next(3)
observer.complete() // 不再發送任何值
observer.next(4) // 因為違反規約,所以不會發送
} catch (err) {
observer.error(err) // 如果捕獲到異常會發送一個 JavaScript 錯誤 或 異常
}
})
observable.subscribe(x => { // 正常
console.log('觀察者-正常')
console.log(x)
})
observable.subscribe(x => { // 模擬異常
throw new Error('丟擲一個異常')
console.log('觀察者-異常')
console.log(x)
})
清理 Observable 執行
因為 Observable 執行可能會是無限的,並且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行。因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或記憶體資源。(每一個觀察者都是互相獨立的)
當呼叫了 observable.subscribe ,觀察者會被附加到新建立的Observable 執行
中。這個呼叫還返回一個物件,即 Subscription (訂閱):
var subscription = observable.subscribe(x => console.log(x));
Subscription 表示進行中的執行,它有最小化的 API 以允許你取消執行。使用 subscription.unsubscribe()
你可以取消進行中的執行:
var observable = Rx.Observable.from([10, 20, 30])
var subscription = observable.subscribe(x => console.log(x)) // 10 20 30
subscription.unsubscribe() // 同步執行完成立馬清除
當你訂閱了 Observable,你會得到一個 Subscription ,它表示進行中的執行。只要呼叫 unsubscribe() 方法就可以取消執行。
當我們使用 create() 方法建立 Observable 時,Observable 必須定義如何清理執行的資源。你可以通過在 function subscribe() 中返回一個自定義的 unsubscribe 函式。
舉例來說,這是我們如何清理使用了 setInterval 的 interval 執行集合:
var cont = 0
var setInterObs = Rx.Observable.create(function subscribe(observer) {
// 追蹤 interval 資源
var intervalID = setInterval(() => {
cont++
observer.next('hi')
}, 1000)
// 提供取消和清理 interval 資源的方法
return function unsubscribe() {
cont = 0
clearInterval(intervalID)
}
})
var unsubscribe = setInterObs.subscribe(function (x) { // 如果不使用箭頭函式,回撥中的 this 代表 subscription
if (cont < 10) {
console.log(x)
} else {
console.log(this)
console.log('清除')
this.unsubscribe()
console.log(cont) // 我們可以發現cont被重置為0,這表明從 subscribe 返回的 unsubscribe 在概念上也等同於 subscription.unsubscribe。
}
})
執行結果如圖:
我們可以從執行結果裡看到 subscription
的unsubscribe方法作為 subscription
的私有方法 _unsubscribe
。
並且從 console.log(cont)
等於0,我們可以知道正如 observable.subscribe
類似於 Observable.create(function subscribe() {...})
,從 subscribe
返回的 unsubscribe
在概念上也等同於 subscription.unsubscribe
。(即我們在執行subscription.unsubscribe
時,從 subscribe
返回的 unsubscribe
也是會被執行的)
事實上,如果我們拋開圍繞這些概念的 ReactiveX 型別,也就只剩下更加直觀的JavaScript。程式碼如下:
function subscribe (observer) {
var intervalID = setInterval(() => {
observer.next('hello')
}, 1000)
return function unsubscribe() {
clearInterval(intervalID)
}
}
var unsubscribe = subscribe({next: (x) => console.log(x)})
unsubscribe() // 清理資源
為什麼我們要使用像 Observable、Observer 和 Subscription 這樣的 Rx 型別?原因是保證程式碼的安全性(比如 Observable 規約)和操作符的可組合性。
結語
這裡我們大致已經瞭解了Rxjs的可觀察物件(Observables)。
再回顧一下前面的內容:
觀察者模式
又叫釋出-訂閱(Publish/Subscribe)模式
:
他定義了一種一對多的依賴關係,讓多個觀察者物件同時監聽某一個主題物件。這個主題物件在狀態發生變化時,會通知所有觀察者物件,使他們能夠自動更新自己。而且各個觀察者之間相互獨立。
這樣的好處是:
- 我們只需要針對可觀察物件這一抽象的主題物件介面程式設計,減少了與具體的耦合,即他只是一個抽象的通知者。
- 觀察者只依賴主題物件的狀態,這意味著維持各個觀察者的一致性,但又保證了各個觀察者是相互獨立的。
producer(生產者) | consumer (消費者) | 單個值 | 多個值 |
---|---|---|---|
pull拉 | Passive(被動的一方):被請求的時候產生資料 | Active(起主導的一方):決定何時請求資料 | Function |
push推 | Active:按自己的節奏生產資料 | Passive:對接收的資料做出反應(處理接收到的資料) | Promise |
可觀察物件(Observables)像是沒有引數, 但可以泛化為允許返回多個值的函式。訂閱一個可觀察物件類似於呼叫一個函式。可觀察物件以同步或者非同步的方式傳送多個值。
使用:
建立:
Rx.Observable.create(function subscribe(observer) {...})
。Observables 可以使用 create 來建立, 但通常我們使用所謂的建立操作符, 像 of、from、interval、等等。如:Rx.Observable.from([10, 20, 30])
訂閱:observable.subscribe(x => {...})
。
執行:傳遞值的話,會發送”Next” 通知observer.next(value)
;結束的話,會發送 “Complete” 通知observer.complete()
;捕獲到異常的話,會發送 “Error” 通知observer.error(err)
。在 Observable 執行中, 可能會發送零個到無窮多個 “Next” 通知。如果傳送的是 “Error” 或 “Complete” 通知的話,那麼之後不會再發送任何通知了。
清理:subscription.unsubscribe()
。當你訂閱了 Observable,你會得到一個 Subscription ,它表示進行中的執行。只要呼叫unsubscribe()
方法就可以取消執行。
提示:後面還有精彩敬請期待,請大家關注我的專題:web前端。如有意見可以進行評論,每一條評論我都會認真對待。