每天實現一個Rxjs中的Operator之Map操作
阿新 • • 發佈:2018-12-09
在上一篇我們簡單實現了Observable.create,那麼如何在此基礎上實現Map操作呢。 對於如下程式碼,我們希望輸出2,4
Observable.create((observer) => {
observer.next(1);
observer.next(2);
})
.map(value => value * 2)
.subscribe((value) => console.log(value));
修改Observable.js,如下:
import { toSubscribe } from "./toSubscriber";
import { higherOrderMap } from "./operator/map";
class Observable {
constructor(subscribe) {
this.source = null;
this.operator = null;
if (subscribe) {
this._subscribe = subscribe;
}
}
static create(subscribe) {
return new Observable(subscribe);
}
lift(operator) {
const observable = new Observable();
observable.source = this ;
observable.operator = operator;
return observable;
}
_subscribe(subscribe) {
return this.source.subscribe(subscribe);
}
subscribe(observerOrNext) {
const { operator } = this;
const sink = toSubscribe(observerOrNext);
if (operator) {
operator.call(sink, this.source);
} else {
this._trySubscribe(sink);
}
}
_trySubscribe(sink) {
return this._subscribe(sink);
}
}
Observable.prototype.map = function (project) {
return higherOrderMap(project)(this);
};
export default Observable;
map.js:
import { Subscriber, } from "../Subscriber";
class MapSubscribe extends Subscriber {
constructor(destinationOrNext, project) {
super(destinationOrNext);
this.project = project;
}
_next(value) {
let result = this.project.call(this, value);
this.destination.next(result);
}
}
class MapOperator {
constructor(project) {
this.project = project;
}
call(subscribe, source) {
return source.subscribe(new MapSubscribe(subscribe, this.project));
}
}
export function higherOrderMap(project) {
return (source) => {
if (typeof project !== 'function') {
throw new Error('argument is not a function')
}
return source.lift(new MapOperator(project));
}
}
原理解析: 1、當呼叫Observable當map操作時,執行Observable的lift方法建立一個新的Observable,並將當前的Observable作為其source,map作為其operator 2、當呼叫subscribe方法時,判斷當前Observable是否存在operator,很顯然我們當前的operator時map,所以執行map操作的call方法 3、在call方法中會呼叫source的subscribe方法,並將mapSubscribe作為observer 4、subscribe方法中再次判斷是否有observable,顯然source是沒有operator的,所以就執行Observable.create傳遞的回撥方法 5、執行next方法的時候,會先執行mapSubscribe的next方法,然後呼叫建立時observer的next方法。也就是 mapSubscribe.next(observer.next)層層傳遞。