1. 程式人生 > >從釋出訂閱模式入手讀懂Node.js的EventEmitter原始碼

從釋出訂閱模式入手讀懂Node.js的EventEmitter原始碼

前面一篇文章setTimeout和setImmediate到底誰先執行,本文讓你徹底理解Event Loop詳細講解了瀏覽器和Node.js的非同步API及其底層原理Event Loop。本文會講一下不用原生API怎麼達到非同步的效果,也就是釋出訂閱模式。釋出訂閱模式在面試中也是高頻考點,本文會自己實現一個釋出訂閱模式,弄懂了他的原理後,我們就可以去讀Node.js的EventEmitter原始碼,這也是一個典型的釋出訂閱模式。

本文所有例子已經上傳到GitHub,同一個repo下面還有我所有博文和例子:

https://github.com/dennis-jiang/Front-End-Knowledges/tree/master/Examples/DesignPatterns/PubSub

為什麼要用釋出訂閱模式

在沒有Promise之前,我們使用非同步API的時候經常會使用回撥,但是如果有幾個互相依賴的非同步API呼叫,回撥層級太多可能就會陷入“回撥地獄”。下面程式碼演示了假如我們有三個網路請求,第二個必須等第一個結束才能發出,第三個必須等第二個結束才能發起,如果我們使用回撥就會變成這樣:

const request = require("request");

request('https://www.baidu.com', function (error, response) {
  if (!error && response.statusCode == 200) {
    console.log('get times 1');

    request('https://www.baidu.com', function(error, response) {
      if (!error && response.statusCode == 200) {
        console.log('get times 2');

        request('https://www.baidu.com', function(error, response) {
          if (!error && response.statusCode == 200) {
            console.log('get times 3');
          }
        })
      }
    })
  }
});

由於瀏覽器端ajax會有跨域問題,上述例子我是用Node.js執行的。這個例子裡面有三層回撥,我們已經有點暈了,如果再多幾層,那真的就是“地獄”了。

釋出訂閱模式

釋出訂閱模式是一種設計模式,並不僅僅用於JS中,這種模式可以幫助我們解開“回撥地獄”。他的流程如下圖所示:

  1. 訊息中心:負責儲存訊息與訂閱者的對應關係,有訊息觸發時,負責通知訂閱者
  2. 訂閱者:去訊息中心訂閱自己感興趣的訊息
  3. 釋出者:滿足條件時,通過訊息中心釋出訊息

有了這種模式,前面處理幾個相互依賴的非同步API就不用陷入"回撥地獄"了,只需要讓後面的訂閱前面的成功訊息,前面的成功後釋出訊息就行了。

自己實現一個釋出訂閱模式

知道了原理,我們自己來實現一個釋出訂閱模式,這次我們使用ES6的class來實現,如果你對JS的面向物件或者ES6的class還不熟悉,請看這篇文章:

class PubSub {
  constructor() {
    // 一個物件存放所有的訊息訂閱
    // 每個訊息對應一個數組,陣列結構如下
    // {
    //   "event1": [cb1, cb2]
    // }
    this.events = {}
  }

  subscribe(event, callback) {
    if(this.events[event]) {
      // 如果有人訂閱過了,這個鍵已經存在,就往裡面加就好了
      this.events[event].push(callback);
    } else {
      // 沒人訂閱過,就建一個數組,回撥放進去
      this.events[event] = [callback]
    }
  }

  publish(event, ...args) {
    // 取出所有訂閱者的回撥執行
    const subscribedEvents = this.events[event];

    if(subscribedEvents && subscribedEvents.length) {
      subscribedEvents.forEach(callback => {
        callback.call(this, ...args);
      });
    }
  }

  unsubscribe(event, callback) {
    // 刪除某個訂閱,保留其他訂閱
    const subscribedEvents = this.events[event];

    if(subscribedEvents && subscribedEvents.length) {
      this.events[event] = this.events[event].filter(cb => cb !== callback)
    }
  }
}

解決回撥地獄

有了我們自己的PubSub,我們就可以用它來解決前面的毀掉地獄問題了:

const request = require("request");
const pubSub = new PubSub();

request('https://www.baidu.com', function (error, response) {
  if (!error && response.statusCode == 200) {
    console.log('get times 1');
    // 釋出請求1成功訊息
    pubSub.publish('request1Success');
  }
});

// 訂閱請求1成功的訊息,然後發起請求2
pubSub.subscribe('request1Success', () => {
  request('https://www.baidu.com', function (error, response) {
    if (!error && response.statusCode == 200) {
      console.log('get times 2');
      // 釋出請求2成功訊息
      pubSub.publish('request2Success');
    }
  });
})

// 訂閱請求2成功的訊息,然後發起請求3
pubSub.subscribe('request2Success', () => {
  request('https://www.baidu.com', function (error, response) {
    if (!error && response.statusCode == 200) {
      console.log('get times 3');
      // 釋出請求3成功訊息
      pubSub.publish('request3Success');
    }
  });
})

Node.js的EventEmitter

Node.js的EventEmitter思想跟我們前面的例子是一樣的,不過他有更多的錯誤處理和更多的API,原始碼在GitHub上都有:https://github.com/nodejs/node/blob/master/lib/events.js。我們挑幾個API看一下:

建構函式

程式碼傳送門: https://github.com/nodejs/node/blob/master/lib/events.js#L64

建構函式很簡單,就一行程式碼,主要邏輯都在EventEmitter.init裡面:

EventEmitter.init裡面也是做了一些初始化的工作,this._events跟我們自己寫的this.events功能是一樣的,用來儲存訂閱的事件。核心程式碼我在圖上用箭頭標出來了。這裡需要注意一點,如果一個型別的事件只有一個訂閱,this._events就直接是那個函數了,而不是一個數組,在原始碼裡面我們會多次看到對這個進行判斷,這樣寫是為了提高效能。

訂閱事件

程式碼傳送門: https://github.com/nodejs/node/blob/master/lib/events.js#L405

EventEmitter訂閱事件的API是onaddListener,從原始碼中我們可以看出這兩個方法是完全一樣的:

這兩個方法都是呼叫了_addListener,這個方法對引數進行了判斷和錯誤處理,核心程式碼仍然是往this._events裡面新增事件:

釋出事件

程式碼傳送門:https://github.com/nodejs/node/blob/master/lib/events.js#L263

EventEmitter釋出事件的API是emit,這個API裡面會對"error"型別的事件進行特殊處理,也就是丟擲錯誤:

如果不是錯誤型別的事件,就把訂閱的回撥事件拿出來執行:

取消訂閱

程式碼傳送門:https://github.com/nodejs/node/blob/master/lib/events.js#L450

EventEmitter裡面取消訂閱的API是removeListeneroff,這兩個是完全一樣的。EventEmitter的取消訂閱API不僅僅會刪除對應的訂閱,在刪除後還會emit一個removeListener事件來通知外界。這裡也會對this._events裡面對應的type進行判斷,如果只有一個,也就是說這個type的型別是function,會直接刪除這個鍵,如果有多個訂閱,就會找出這個訂閱,然後刪掉他。如果所有訂閱都刪完了,就直接將this._events置空:

觀察者模式

這裡再提一個很相似的設計模式:觀察者模式,有些文章認為他和釋出訂閱模式是一樣的,有些認為他們是有區別的。筆者認為他更像一個低配版的釋出訂閱模式,我們來實現一個看看:

class Subject {
  constructor() {
    // 一個數組存放所有的訂閱者
    // 每個訊息對應一個數組,陣列結構如下
    // [
    //   {
    //     observer: obj,
    //     action: () => {}
    //   }
    // ]
    this.observers = [];
  }

  addObserver(observer, action) {
    // 將觀察者和回撥放入陣列
    this.observers.push({observer, action});
  }

  notify(...args) {
    // 執行每個觀察者的回撥
    this.observers.forEach(item => {
      const {observer, action} = item;
      action.call(observer, ...args);
    })
  }
}

const subject = new Subject();

// 新增一個觀察者
subject.addObserver({name: 'John'}, function(msg){
  console.log(this.name, 'got message: ', msg);
})

// 再新增一個觀察者
subject.addObserver({name: 'Joe'}, function(msg) {
  console.log(this.name, 'got message: ', msg);
})

// 通知所有觀察者
subject.notify('tomorrow is Sunday');

上述程式碼的輸出是:

通過這個輸出可以看出一旦調了通知的方法notify,所有觀察者都會收到通知,而且會收到同樣的資訊。而釋出訂閱模式還可以自定義需要接受的通知,所以說觀察者模式是低配版的釋出訂閱模式。

總結

本文講解了釋出訂閱模式的原理,並自己實現了一個簡單的釋出訂閱模式。在瞭解了原理後,還去讀了Node.js的EventEmitter模組的原始碼,進一步學習了生產環境的釋出訂閱模式的寫法。總結下來發布訂閱模式有以下特點:

  1. 解決了“回撥地獄”
  2. 將多個模組進行了解耦,自己執行時,不需要知道另一個模組的存在,只需要關心釋出出來的事件就行
  3. 因為多個模組可以不知道對方的存在,自己關心的事件可能是一個很遙遠的旮旯釋出出來的,也不能通過程式碼跳轉直接找到釋出事件的地方,debug的時候可能會有點困難。
  4. 觀察者模式是低配版的釋出訂閱模式,一旦釋出通知,所有觀察者都會收到訊息,不能做到釋出訂閱那樣精細的控制。

文章的最後,感謝你花費寶貴的時間閱讀本文,如果本文給了你一點點幫助或者啟發,請不要吝嗇你的贊和GitHub小星星,你的支援是作者持續創作的動力。

作者博文GitHub專案地址: https://github.com/dennis-jiang/Front-End-Knowledges

作者掘金文章彙總:https://juejin.im/post/5e3ffc85518825494e277