node中的精髓Stream(流)
https://www.jianshu.com/p/2c76ef653af6
2018.02.03 16:34 字數 1387 閱讀 80評論 0喜歡 0
在前端工程化中產生了很多工具,例如grunt,gulp,webpack,babel...等等,這些工具都是通過node中的stream實現。
在node中stream也是非常非常非常重要的模組,比如我們常用的console就是基於stream的例項,還有net,http等核心模組都是基於stream來實現的,可見stream是多麼的重要。
1.什麼是stream?
是一種資料傳輸手段,從一個地方傳輸到另一個地方。
在寫node的時候會存在讀取檔案,比如現在我們有一個非常大的檔案,50G吧
const fs = require('fs');
// test檔案50個G
fs.readFileSync('./test.text');
這個時候需要消耗大量的時候去讀取這個檔案,然而我們可能關心的並不是檔案所有內容,還會存在直接讀取失敗。stream就是為了解決這些問題而產生,我們讀一些資料處理一些資料,當讀到所關心資料的時候,則可以不再繼續讀取。
stream翻譯成中文‘流’,就像水一樣,從水龍頭流向水杯。
2. Stream模組
stream繼承於EventEmitter,擁有事件觸發和事件監聽功能。主要分為4種基本流型別:
- Readable (可讀流)
- Writable (可寫流)
- Duplex (讀寫流)
- Transform (轉換流)
在流中預設可操作的型別string和Buffer,如果需要處理其他型別的js值需要傳入引數objectMode: true(預設為false)
在流中存在一個重要的概念,快取區,就像拿水杯去接水,水杯就是快取區,當水杯滿,則會關閉水龍頭,等把水杯裡面的水消耗完畢,再開啟水龍頭去接水。
stream預設快取區大小為16384(16kb),可以通過highWaterMark引數設定快取區大小,但設定encoding後,以設定的字元編碼為單位衡量。
3. Readable
首先建立一個可讀流,可接收5個引數:
- highWaterMark 快取區位元組大小,預設16384
- encoding 字元編碼,預設為null,就是buffer
- objectMode 是否操作js其他型別 預設false
- read 對內部的_read()方式實現 子類實現,父類呼叫
- destroy 對內部的_ destroy()方法實現 子類實現,父類呼叫
可讀流中分為2種模式流動模式和暫停模式。
監聽data事件,觸發流動模式,會源源不斷生產資料觸發data事件:
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
// 這裡傳入的read方法,會被寫入_read()
read: (size) => {
// size 為highWaterMark大小
// 在這個方法裡面實現獲取資料,讀取到資料呼叫rs.push([data]),如果沒有資料了,push(null)結束流
if (i < 10) {
rs.push(`當前讀取資料: ${i++}`);
} else {
rs.push(null);
}
},
// 原始碼,可覆蓋
destroy(err, cb) {
rs.push(null);
cb(err);
}
});
rs.on('data', (data) => {
console.log(data);
// 每次push資料則觸發data事件
// 當前讀取資料: 0
// 當前讀取資料: 1
// 當前讀取資料: 2
// 當前讀取資料: 3
// 當前讀取資料: 4
// 當前讀取資料: 5
// 當前讀取資料: 6
// 當前讀取資料: 7
// 當前讀取資料: 8
// 當前讀取資料: 9
})
監聽readable事件,觸發暫停模式,當流有了新資料或到了流結束之前觸發readable事件,需要顯示呼叫read([size])讀取資料:
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
// 這裡傳入的read方法,會被寫入_read()
read: (size) => {
// size 為highWaterMark大小
// 在這個方法裡面實現獲取資料,讀取到資料呼叫rs.push([data]),如果沒有資料了,push(null)結束流
if (i < 10) {
// push其實是把資料放入快取區
rs.push(`當前讀取資料: ${i++}`);
} else {
rs.push(null);
}
}
});
rs.on('readable', () => {
const data = rs.read(9);
console.log(data);
//
})
read([size]) size引數:
- 不傳代表讀取快取區所有資料。
- 傳入0 填充快取區, 但返回null
- size < 當前快取區資料 返回所需資料
- size > 當前快取區資料 返回null 並改變highWaterMark值
這裡的快取區資料不是指highWaterMark,獲取快取區資料大小rs._readableState.length。
流的模式可以自由切換: 通過rs._readableState.flowing的值獲取當前狀態
- null 初始狀態
- false 暫停模式
- true 流動模式
rs.pause()切換到暫停模式 rs.resume()切換到流動模式
在可讀流裡面還可以監聽其他事件:
rs.on('close', () => {
// 流關閉時或檔案關閉時觸發
})
rs.on('end', () => {
// 在流中沒有資料可供消費時觸發
})
rs.on('error', (err) => {
// 發生錯誤時候
})
4. Writable
可寫流可接受引數:
- highWaterMark 快取區位元組大小,預設16384
- decodeStrings 是否將字元編碼傳入緩衝區
- objectMode 是否操作js其他型別 預設false
- write 子類實現,供父類呼叫 實現寫入底層資料
- writev 子類實現,供父類呼叫 一次處理多個chunk寫入底層資料
- destroy 可以覆蓋父類方法,不能直接呼叫,銷燬流時,父類呼叫
- final 完成寫入所有資料時父類觸發
在實現流除了用上面直接傳入引數的方式,還可以用繼承類
class WS extends stream.Writable {
constructor() {
super({
highWaterMark: 1
});
}
_write(chunk, encoding, cb) {
console.log(this._writableState.length);
// chunk 為需要寫入的資料
// encoding 字元編碼
// cb 回撥函式, 如果寫入成功需要呼叫cb去執行下一次寫入,如果發生錯誤,可以cb(new Error([錯誤資訊]))
if (chunk.length < 4) {
fs.writeFileSync('./2.text', chunk, {
flag: 'a'
});
cb();
} else{
cb(new Error('超出4個位元組'));
}
}
}
const ws = new WS();
let i = 0;
function next() {
let flag = true;
// write() 會返回boolean false -> 快取區沒滿 true —> 已滿,需要暫停寫入資料
while(i < 10 && flag) {
flag = ws.write(`${i++}`);
console.log('flag', flag);
}
}
next();
// 當所有快取區資料已經成功寫入底層資料,快取區沒有資料了,觸發drain事件
ws.on('drain', () => {
console.log('drain');
// 繼續寫入快取區資料
next();
})
可寫流的end事件,一旦觸發end事件,後續不能再寫入資料.
ws.write('start');
ws.end('end');
ws.wrtie('test'); // 報錯 write after end
finish事件:
ws.write('start');
ws.end('end');
ws.on('finish', () => {
console.log('呼叫end方法後,並且所有資料已經寫入底層')
})
cork()與uncork(),強制所有資料先寫入快取區,直到呼叫uncork()或end(),這時一併寫入底層:
const ws = stream.Writable({
writev(chunks, encoding, cb) {
// 這時chunks為一個數組,包含所有的chunk
// 現在length為10
console.log(chunk.length);
}
});
// 寫入資料之前,強制寫入資料放入快取區
ws.cork();
// 寫入資料
for (let i = 0; i < 10; i++) {
ws.write(i.toString());
}
// 寫入完畢,可以觸發寫入底層
ws.uncork();
5. Duplex
讀寫流,該方法繼承了可寫流和可讀流,但相互之間沒有關係,各自獨立快取區,擁有Writable和Readable所有方法和事件,同時實現_read()和_write()方法。
const fs = require('fs');
const stream = require('stream');
const duplex = stream.Duplex({
write(chunk, encoding, cb) {
console.log(chunk.toString('utf8')); // 寫入
},
read() {
this.push('讀取');
this.push(null);
}
});
console.log(duplex.read(6).toString('utf8')); // 讀取
duplex.write('寫入');
6. Transform
轉換流,這個流在前端工程化中用到最多,從一個地方讀取資料,轉換資料後輸出到一個地方,該流繼承於Duplex。
const fs = require('fs');
const stream = require('stream');
const transform = stream.Transform({
transform(chunk, encoding, cb){
// 把資料轉換成大寫字母,然後push到快取區
this.push(chunk.toString().toUpperCase());
cb();
}
});
transform.write('a');
console.log(transform.read(1).toString()); // A
7. fs快速建立可讀/可寫流
可讀流和可寫流都需要我們去實現父類的方法,那麼fs這個模組幫我們做了這件事情,fs裡面實現了高效並且可靠的可讀/可寫流,提供快速建立流,不再去實現父類_write()或_read()。下面我們來看看如何使用:
const fs = require('fs');
/**
* 建立可讀流
*
* 第一個引數檔案路徑
*
* 第二個引數為options
*
flags?: string;
encoding?: string; 字元編碼
fd?: number; 檔案開啟後的識別符號
mode?: number; 檔案的許可權
autoClose?: boolean; 讀取完畢後,是否自動關閉檔案
start?: number; 從哪個位置開始讀取
end?: number; 讀到什麼時候結束
highWaterMark?: number; 最高水位線
*/
const rs = fs.createReadStream('1.text');
rs.on('data', data => {
console.log(data);
})
/**
* 建立可寫流
*
* 第一個引數檔案路徑
*
* 第二個引數為options
*
flags?: string;
encoding?: string; 字元編碼
fd?: number; 檔案開啟後的識別符號
mode?: number; 檔案的許可權
autoClose?: boolean; 寫入完畢後,是否自動關閉檔案
start?: number; 從什麼位置開始寫入
*/
const ws = fs.createWriteStream('2.text');
ws.write('123');
8. pipe
在流中搭建一條管道,從可讀流中到可寫流。
可讀流中有pipe()方法,在可寫流中可以監聽pipe事件,下面實現了從可讀流中通過管道到可寫流:
const fs = require('fs');
const stream = require('stream');
const rs = stream.Readable({
read() {
this.push(fs.readFileSync('./1.text')); // 檔案內容 test
this.push(null);
}
});
const ws = stream.Writable({
write(chunk, encoding, cb) {
// chunk為test buffer
fs.writeFileSync('./2.text', chunk.toString());
cb();
}
});
ws.on('pipe', data => {
// 觸發pipe事件
console.log(data);
});
rs.pipe(ws);
9. 總結
流分為四種基本型別,兩種模式。流中的資料不是直接寫入或讀取,有快取區的概念。