Node 中的 events.EventEmitter 模組
http://imweb.io/topic/5973722452e1c21811630609
// 中文api
http://nodejs.cn/api/stream.html#stream_readable_pipe_destination_options
// 關於 stream.pipe 你需要知道更多
http://blog.kankanan.com/article/51734e8e-stream.pipe-4f609700898177e5905366f4591a.html
[上面這篇文章有我關心的問題,即當用stream.pipe時,是否需要手動關閉流,結論是需要手動關閉,但是像上面文章的寫法,在實際中好像並沒有真正的被呼叫,也沒有destroy,
stream.pipe(res).once('close', function () { stream.destroy(); });
所以換個寫法是好用的,不確定是因為版本的問題還是什麼原因
stream.pipe(res);
stream.on('close', function () { stream.destroy(); });
]
//Node.js中不可不精的Stream
https://juejin.im/post/5b14aa536fb9a01e681056e5
// 深入理解 Node.js Stream 內部機制, 下面的文章比較深入
http://taobaofed.org/blog/2017/08/31/nodejs-stream/
下面這篇文章的例子很不錯
Node 中的許多核心 API 都是通過事件驅動的非同步架構實現的,具體來說就是當 emitters
傳送事件後,相應的響應函式( listeners
)會被執行。例如:net.Server
會在每次收到連線時發出事件,fs.ReadStram
會在檔案開啟時發出事件,stram
會在有資料可讀時發出事件。 所有這些物件都是 EventEmitter
的例項,它們通過向外暴露的 eventEmitter.on()
基本的使用
on 和 emit 方法
events 模組有且只有一個物件 events.EventEmitter
,它的核心功能就是事件的觸發(emit
)和事件的監聽(on
),一個簡單的例子如下:
const EventEmitter = require('events');
let emitter = new EventEmitter();
emitter.on('hi', (name) => {
console.log(`hi, my name is ${name}!`);
});
emitter.emit('hi', 'elvin');
在上述的例子中,我們通過 emitter.on('hi', func)
的方式註冊了 hi 事件的監聽函式,通過 emitter.emit('hi', 'elvin')
的方式觸發了 hi 事件,且會向事件處理函式傳遞引數 'elvin',所以最後的執行結果為 hi, my name is elvin!
。 這裡需要說明的時,EventEmitter 還有一個 addeListener
的方法,它只不過是 on
方法的別名,兩者沒有任何區別。
once 方法
有些時候,我們希望某些事件響應函式只被執行一次,這個時候就可以使用 once()
方法,它會和 on()
一樣註冊事件的響應函式,不過當響應函式執行一次之後,就會將其移除。
const EventEmitter = require('events');
let emitter = new EventEmitter();
emitter.once('hi', (name) => {
console.log(`hi, my name is ${name}!`);
});
emitter.emit('hi', 'elvin');
emitter.emit('hi', 'leonard');
上面的例子中只會輸出 hi, my name is elvin!
(leonard 很高冷,不屑於向你打招呼┗( ▔, ▔ )┛)。
prependListener 方法
當一個事件綁定了多個響應函式時,會按照函式繫結的順序依次執行,除非響應函式是通過 prependListener()
方法繫結的,它使用的方式和 on()
類似,不過會將響應函式插到當前該事件處理函式佇列的頭部,具體的例子如下:
const EventEmitter = require('events');
let emitter = new EventEmitter();
emitter.on('hi', (name) => {
console.log(`my name is ${name}!`);
});
emitter.on('hi', () => {
console.log('I\'m from Wuhan.');
});
emitter.prependListener('hi', () => {
console.log('nice to meet you!');
});
emitter.on('hi', () => {
console.log('What\'s your name?');
});
emitter.emit('hi', 'elvin');
// 輸出為:
// nice to meet you!
// my name is elvin.
// I'm from Wuhan.
// What\'s your name?
響應函式的數量
因為繫結過多的響應函式會消耗大量的記憶體,所以為了避免記憶體洩漏,在 Event.EventEmitter
中一個事件可以繫結的響應函式數量是存在限制的,相關的屬性和方法如下:
- EventEmitter.defaultMaxListeners: 預設值為10, 表示每個事件的最多可以繫結的響應函式數量。需要注意的是,當修改它時,會影響所有
EventEmitter
的例項。 - emitter.listenerCount(eventName):獲取事件
eventName
已繫結的響應函式個數。 - emitter.setMaxListeners(n):修改 emitter 的每個事件最多可以繫結的響應函式數量,該方法會修改
emitter._maxListeners
的值,其優先順序大於*EventEmitter.defaultMaxListeners
。 - emitter.getMaxListeners():獲取 emitter 每個事件最多可以繫結的響應函式數量。
其他相關方法
EventEmitter
還有一些其他的方法和屬性,這裡就不做具體介紹,簡要地說一下。
- emitter.eventNames():返回當前已經繫結響應函式的事件名組成的陣列。
- emitter.listeners(eventName):返回
eventName
事件的響應函式組成的陣列。 - emitter.prependOnceListener(eventName, listener):類似於
once()
,不過會將響應函式插到當前該事件處理函式佇列的頭部。 - emitter.removeAllListeners([eventName]):移除
eventName
事件所有的響應函式。當未傳入eventName
引數時,所有事件的響應函式都會被移除。 - emitter.removeListener(eventName, listener):移除
eventName
事件的響應函式listener
。
newListener 和 removeListener 事件
當 emitter 被註冊響應函式時,會觸發 newListener
事件;被移除響應函式時,會觸發 removeListener
事件。兩個事件的響應函式會被傳入兩個引數:註冊的事件名和響應的響應函式。具體的例子如下:
const EventEmitter = require('events');
let emitter = new EventEmitter();
// 注意:此處使用 emitter.on 方法的話會陷入迴圈呼叫,導致棧溢位
emitter.once('newListener', (event, listener) => {
if (event === 'hi') {
emitter.on('hi', () => {
console.log('Nice to meet you.');
});
}
});
emitter.on('hi', (name) => {
console.log(`My name is ${name}!`);
});
emitter.emit('hi', 'elvin');
執行結果為 Nice to meet you. My name is elvin.
。實際上, newListener
事件被觸發時,響應函式還未被註冊至 emitter,因而我們就可以在在目標響應函式之前插入其他響應函式,例如上面的例子中 Nice to meet you.
就在 My name is elvin.
之前進行輸出。
在 Node 原始碼中的使用
如在開頭所說,net.Server
、fs.ReadStram
、stream
等 Node 內建物件都是 EventEmitter
的例項,它們通過向外暴露的 eventEmitter.on()
介面從而讓不同的事件響應函式得以執行。這裡以 stream
的部分原始碼為例,講講 events.EventEmitter
在 Node 中的使用。
// stream 部分原始碼 2017/02/22 版
// source: https://github.com/nodejs/node/blob/master/lib/internal/streams/legacy.js
const EE = require('events');
const util = require('util'); // Node 的輔助函式庫
function Stream() {
EE.call(this);
}
util.inherits(Stream, EE);
Stream.prototype.pipe = function(dest, options) {
var source = this;
function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
}
source.on('data', ondata);
function ondrain() {
if (source.readable && source.resume) {
source.resume();
}
}
dest.on('drain', ondrain);
// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once.
if (!dest._isStdio && (!options || options.end !== false)) {
source.on('end', onend);
source.on('close', onclose);
}
var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;
dest.end();
}
function onclose() {
if (didOnEnd) return;
didOnEnd = true;
if (typeof dest.destroy === 'function') dest.destroy();
}
// don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup();
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
}
}
source.on('error', onerror);
dest.on('error', onerror);
// remove all the event listeners that were added.
function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);
source.removeListener('close', onclose);
source.removeListener('error', onerror);
dest.removeListener('error', onerror);
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
dest.removeListener('close', cleanup);
}
source.on('end', cleanup);
source.on('close', cleanup);
dest.on('close', cleanup);
dest.emit('pipe', source);
// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
};
module.exports = Stream;
上述程式碼主要完成了三件事情:
- 通過在 Stream 的建構函式中呼叫
EE.call(this)
和利用util.inherits(Stream, EE);
呼叫,讓 Sever 繼承自 EventEmitter; - 通過 on 方法在資料來源 sourse 上註冊了 data、end、close、error 等事件的響應函式,在資料目的源 dest 上註冊了 drain、end、close、error 等事件的響應函式;
- 在完成初始化和響應函式註冊後,向資料目的源發出 pipe 事件。
ES6 中的使用方式
如上節所示,在 Node 中都是通過 util.inherits(Stream, EventEmitter);
來實現繼承,但是在 Node 的官方文件中,該種方式已不被推薦。更推薦的做法是通過 ES6 中的 class
和 extends
來實現繼承:
const EventEmitter = require('events');
class MyStream extends EventEmitter {
write(data) {
this.emit('data', data);
}
}
const stream = new MyStream();
stream.on('data', (data) => {
console.log(`Received data: "${data}"`);
});
stream.write('With ES6');
// output: Received data: "With ES6"
寫在最後
事件驅動、非非同步 I/O 的特點成就瞭如今的 Node,而 Node 中事件驅動依靠的就是 events.EventEmitter!需要說明的是,events 和 events.EventEmitter 其實指向的都是 EventEmitter,之所以會有 events.EventEmitter 只是為了保持對 Node 0.10.x 版本的相容。