每天學點node系列-stream
在編寫程式碼時,我們應該有一些方法將程式像連線水管一樣連線起來 -- 當我們需要獲取一些資料時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964
為什麼應該使用stream?
在node中,I/O都是非同步的,所以在和硬碟以及網路的互動過程中會涉及到傳遞迴調函式的過程。你之前可能會寫出這樣的程式碼:
var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); });}); server.listen(8000);
上面的這段程式碼並沒有什麼問題,但是在每次請求時,我們都會把整個data.txt檔案讀入到記憶體中,然後再把結果返回給客戶端。想想看,如果data.txt檔案非常大,在響應大量使用者的併發請求時,程式可能會消耗大量的記憶體,這樣很可能會造成使用者連線緩慢的問題。其次,上面的程式碼可能會造成很不好的使用者體驗,因為使用者在接收到任何的內容之前首先需要等待程式將檔案內容完全讀入到記憶體中。所幸的是,(req,res)
引數都是流物件,這意味著我們可以使用一種更好的方法來實現上面的需求:
var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server.listen(8000);
在這裡,.pipe()方法會自動幫助我們監聽data和end事件。上面的這段程式碼不僅簡潔,而且data.txt檔案中每一小段資料都將源源不斷的傳送到客戶端。
除此之外,使用.pipe()方法還有別的好處,比如說它可以自動控制後端壓力,以便在客戶端連線緩慢的時候node可以將盡可能少的快取放到記憶體中。
認識NodeJS中的stream
流(stream)是 Node.js 中處理流式資料的抽象介面。·stream
模組用於構建實現了流介面的物件。
我們用到的很多核心模組都是stream
的例項。 例如:http.clientRequest, process.stdout。
流可以是可讀的、可寫的、或者可讀可寫的。
所有的流都是 EventEmitter 的例項。
雖然我們平時開發過程中平常不會直接用到stream
模組,但是也需要了解其執行機制。
對於想要實現自定義stream例項的開發者來說,就得好好研究stream的擴充套件API了,比如gulp的內部實現就大量用到了自定義的stream型別。
stream的型別
Node.js 中有四種基本的流型別:
- Writable - 可寫入資料的流(例如 fs.createWriteStream())。
- Readable - 可讀取資料的流(例如 fs.createReadStream())。
- Duplex - 可讀又可寫的流(例如 net.Socket)。
- Transform - 在讀寫過程中可以修改或轉換資料的 Duplex 流(例如 zlib.createDeflate())。
使用Stream可實現資料的流式處理,如:
var fs = require('fs')
// `fs.createReadStream`建立一個`Readable`物件以讀取`bigFile`的內容,並輸出到標準輸出
// 如果使用`fs.readFile`則可能由於檔案過大而失敗
fs.createReadStream(bigFile).pipe(process.stdout)
Readable
Readable流可以產出資料,你可以將這些資料傳送到一個writable,transform或者duplex流中,只需要呼叫pipe()方法:
建立個readable流
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
下面執行程式碼
$ node read.js
beep boop
在上面的程式碼中rs.push(null)的作用是告訴rs輸出資料應該結束了。
需要注意的一點是我們在將資料輸出到process.stdout之前已經將內容推送進readable流rs中,但是所有的資料依然是可寫的。這是因為在你使用.push()將資料推進一個readable流中時,一直要到另一個東西來消耗資料之前,資料都會存在一個快取中。然而,在更多的情況下,我們想要的是當需要資料時資料才會產生,以此來避免大量的快取資料。
流式消耗迭代器中的資料
我們可以通過定義一個._read函式來實現按需推送資料:
const Readable = require('stream').Readable
class ToReadable extends Readable {
constructor(iterator) {
super()
this.iterator = iterator
}
// 子類需要實現該方法
// 這是生產資料的邏輯
_read() {
const res = this.iterator.next()
if (res.done) {
// 資料來源已枯竭,呼叫`push(null)`通知流
return this.push(null)
}
setTimeout(() => {
// 通過`push`方法將資料新增到流中
this.push(res.value + '\n')
}, 0)
}
}
module.exports = ToReadable
使用時,new ToReadable(iterator)會返回一個可讀流,下游可以流式的消耗迭代器中的資料。
const iterator = function (limit) {
return {
next: function () {
if (limit--) {
return { done: false, value: limit + Math.random() }
}
return { done: true }
}
}
}(1e10)
const readable = new ToReadable(iterator)
// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data))
// 所有資料均已讀完
readable.on('end', () => process.stdout.write('DONE'))
執行上述程式碼,將會有100億個隨機數源源不斷地寫進標準輸出流。
建立可讀流時,需要繼承Readable,並實現_read方法。 * _read方法是從底層系統讀取具體資料的邏輯,即生產資料的邏輯。 * 在_read方法中,通過呼叫push(data)將資料放入可讀流中供下游消耗。 * 在_read方法中,可以同步呼叫push(data),也可以非同步呼叫。 * 當全部資料都生產出來後,必須呼叫push(null)來結束可讀流。 * 流一旦結束,便不能再呼叫push(data)新增資料。
可以通過監聽data事件的方式消耗可讀流。 * 在首次監聽其data事件後,readable便會持續不斷地呼叫_read(),通過觸發data事件將資料輸出。 * 第一次data事件會在下一個tick中觸發,所以,可以安全地將資料輸出前的邏輯放在事件監聽後(同一個tick中)。 * 當資料全部被消耗時,會觸發end事件。
上面的例子中,process.stdout代表標準輸出流,實際是一個可寫流。
Writable
一個writable流指的是隻能流進不能流出的流:
src.pipe(writableStream)
建立一個writable流
只需要定義一個._write(chunk,enc,next)函式,你就可以將一個readable流的資料釋放到其中:
const Writable = require('stream').Writable
const writable = Writable()
// 實現`_write`方法
// 這是將資料寫入底層的邏輯
writable._write = function (data, enc, next) {
// 將流中的資料寫入底層
process.stdout.write(data.toString().toUpperCase())
// 寫入完成時,呼叫`next()`方法通知流傳入下一個資料
process.nextTick(next)
}
// 所有資料均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))
// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')
// 再無資料寫入流時,需要呼叫`end`方法
writable.end()
執行結果如下:
$ node 1.js
A
B
C
DONE
- 上游通過呼叫writable.write(data)將資料寫入可寫流中。write()方法會呼叫_write()將data寫入底層。
- _write中,當資料成功寫入底層後,必須呼叫next(err)告訴流開始處理下一個資料。
- 在從一個readable流向一個writable流傳資料的過程中,資料會自動被轉換為
Buffer
物件,除非你在建立writable流的時候制定了decodeStrings
引數為false
:Writable({decodeStrings: false})
。 - 如果你需要傳遞物件,需要指定
objectMode
引數為true
,Writable({ objectMode: true })
。 - 在end方法呼叫後,當所有底層的寫操作均完成時,會觸發finish事件。
- 上游必須呼叫writable.end(data)來結束可寫流,data是可選的。此後,不能再呼叫write新增資料。
- next的呼叫既可以是同步的,也可以是非同步的.
_write的引數:
- 第一個引數,
chunk
表寫進來的資料。 - 第二個引數
enc
代表編碼的字串,但是隻有在opts.decodeString
為false
的時候你才可以寫一個字串。 - 第三個引數,
next(err)
是一個回撥函式,使用這個回撥函式你可以告訴資料消耗者可以寫更多的資料。你可以有選擇性的傳遞一個錯誤物件error
,這時會在流實體上觸發一個emit
事件。
向一個writable流中寫東西
如果你需要向一個writable流中寫東西,只需要呼叫.write(data)即可。
process.stdout.write('beep boop\n');
為了告訴一個writable流你已經寫完畢了,只需要呼叫.end()方法。你也可以使用.end(data)在結束前再寫一些資料。
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
},1000);
執行結果如下所示:
$ node writing.js
$ cat message.txt
beep boop
如果你在建立writable流時指定了highWaterMark引數,那麼當沒有更多資料寫入時,呼叫.write()方法將會返回false。如果你想要等待快取情況,可以監聽drain事件。
Duplex
Duplex流是一個可讀也可寫的流,就好像一個電話,可以接收也可以傳送語音。一個rpc交換是一個duplex流的最好的例子。如果你看到過下面這樣的程式碼:
a.pipe(b).pipe(a)
那麼你需要處理的就是一個duplex流物件。
實現一個Duplex
var Duplex = require('stream').Duplex
var duplex = Duplex()
// 可讀端底層讀取邏輯
duplex._read = function () {
this._readNum = this._readNum || 0
if (this._readNum > 1) {
this.push(null)
} else {
this.push('' + (this._readNum++))
}
}
// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
// a, b
process.stdout.write('_write ' + buf.toString() + '\n')
next()
}
// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()
上面的程式碼中實現了_read方法,所以可以監聽data事件來消耗Duplex產生的資料。 同時,又實現了_write方法,可作為下游去消耗資料。
因為它既可讀又可寫,所以稱它有兩端:可寫端和可讀端。 可寫端的介面與Writable一致,作為下游來使用;可讀端的介面與Readable一致,作為上游來使用。
Transform
Transform stream是Duplex stream的特例,也就是說,Transform stream也同時可讀可寫。跟Duplex stream的區別點在於,Transform stream的輸出與輸入是存在相關性的。
const Transform = require('stream').Transform
class Rotate extends Transform {
constructor(n) {
super()
// 將字母旋轉`n`個位置
this.offset = (n || 13) % 26
}
// 將可寫端寫入的資料變換後新增到可讀端
_transform (buf, enc, next) {
var res = buf.toString().split('').map(c => {
var code = c.charCodeAt(0)
if (c >= 'a' && c <= 'z') {
code += this.offset
if (code > 'z'.charCodeAt(0)) {
code -= 26
}
} else if (c >= 'A' && c <= 'Z') {
code += this.offset
if (code > 'Z'.charCodeAt(0)) {
code -= 26
}
}
return String.fromCharCode(code)
}).join('')
// 呼叫push方法將變換後的資料新增到可讀端
this.push(res)
// 呼叫next方法準備處理下一個
next()
}
}
var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()
執行結果如下:
$ node 1.js
khoor, zruog!
Tranform繼承自Duplex,並已經實現了_read和_write方法,同時要求使用者實現一個_transform方法。
相關連結
https://nodejs.org/api/stream.h