1. 程式人生 > >node中的精髓Stream(流)

node中的精髓Stream(流)

https://www.jianshu.com/p/2c76ef653af6

 

lihuanji 關注

2018.02.03 16:34 字數 1387 閱讀 80評論 0喜歡 0

在前端工程化中產生了很多工具,例如grunt,gulp,webpack,babel...等等,這些工具都是通過node中的stream實現。
在node中stream也是非常非常非常重要的模組,比如我們常用的console就是基於stream的例項,還有net,http等核心模組都是基於stream來實現的,可見stream是多麼的重要。

1.什麼是stream?

是一種資料傳輸手段,從一個地方傳輸到另一個地方。

在寫node的時候會存在讀取檔案,比如現在我們有一個非常大的檔案,50G吧

    const fs = require('fs');
    // test檔案50個G
    fs.readFileSync('./test.text');

這個時候需要消耗大量的時候去讀取這個檔案,然而我們可能關心的並不是檔案所有內容,還會存在直接讀取失敗。stream就是為了解決這些問題而產生,我們讀一些資料處理一些資料,當讀到所關心資料的時候,則可以不再繼續讀取。

stream翻譯成中文‘流’,就像水一樣,從水龍頭流向水杯

2. Stream模組

stream繼承於EventEmitter,擁有事件觸發和事件監聽功能。主要分為4種基本流型別:

  1. Readable (可讀流) 
  2. Writable (可寫流) 
  3. Duplex (讀寫流) 
  4. Transform (轉換流) 

    在流中預設可操作的型別string和Buffer,如果需要處理其他型別的js值需要傳入引數objectMode: true(預設為false)

在流中存在一個重要的概念,快取區,就像拿水杯去接水,水杯就是快取區,當水杯滿,則會關閉水龍頭,等把水杯裡面的水消耗完畢,再開啟水龍頭去接水。

stream預設快取區大小為16384(16kb),可以通過highWaterMark引數設定快取區大小,但設定encoding後,以設定的字元編碼為單位衡量。

3. Readable

首先建立一個可讀流,可接收5個引數:

  • highWaterMark 快取區位元組大小,預設16384
  • encoding 字元編碼,預設為null,就是buffer
  • objectMode 是否操作js其他型別 預設false
  • read 對內部的_read()方式實現 子類實現,父類呼叫
  • destroy 對內部的_ destroy()方法實現 子類實現,父類呼叫

可讀流中分為2種模式流動模式暫停模式

監聽data事件,觸發流動模式,會源源不斷生產資料觸發data事件:

    const { Readable } = require('stream');
    
    let i = 0;
        
    const rs = Readable({
        encoding: 'utf8',
        // 這裡傳入的read方法,會被寫入_read()
        read: (size) => {
            // size 為highWaterMark大小
            // 在這個方法裡面實現獲取資料,讀取到資料呼叫rs.push([data]),如果沒有資料了,push(null)結束流
            if (i < 10) {
                rs.push(`當前讀取資料: ${i++}`);
            } else {
                rs.push(null);
            }
        },
        // 原始碼,可覆蓋
        destroy(err, cb) {
            rs.push(null);
            cb(err);
        }
    });
        
    rs.on('data', (data) => {
        console.log(data);
        // 每次push資料則觸發data事件
        // 當前讀取資料: 0
        // 當前讀取資料: 1
        // 當前讀取資料: 2
        // 當前讀取資料: 3
        // 當前讀取資料: 4
        // 當前讀取資料: 5
        // 當前讀取資料: 6
        // 當前讀取資料: 7
        // 當前讀取資料: 8
        // 當前讀取資料: 9
    })

監聽readable事件,觸發暫停模式,當流有了新資料或到了流結束之前觸發readable事件,需要顯示呼叫read([size])讀取資料:

    const { Readable } = require('stream');
        
    let i = 0;
        
    const rs = Readable({
        encoding: 'utf8',
        highWaterMark: 9,
        // 這裡傳入的read方法,會被寫入_read()
        read: (size) => {
            // size 為highWaterMark大小
            // 在這個方法裡面實現獲取資料,讀取到資料呼叫rs.push([data]),如果沒有資料了,push(null)結束流
            if (i < 10) {
              // push其實是把資料放入快取區
              rs.push(`當前讀取資料: ${i++}`);
            } else {
                rs.push(null);
            }
        }
    });
    
    rs.on('readable', () => {
        const data = rs.read(9);
        console.log(data);
        // 
    })

read([size]) size引數:

  • 不傳代表讀取快取區所有資料。
  • 傳入0 填充快取區, 但返回null
  • size < 當前快取區資料 返回所需資料
  • size > 當前快取區資料 返回null 並改變highWaterMark值

這裡的快取區資料不是指highWaterMark,獲取快取區資料大小rs._readableState.length。

流的模式可以自由切換: 通過rs._readableState.flowing的值獲取當前狀態

  • null 初始狀態
  • false 暫停模式
  • true 流動模式

rs.pause()切換到暫停模式 rs.resume()切換到流動模式

在可讀流裡面還可以監聽其他事件:

    
    rs.on('close', () => {
        // 流關閉時或檔案關閉時觸發
    })
    
    rs.on('end', () => {
        // 在流中沒有資料可供消費時觸發
    })
    
    rs.on('error', (err) => {
        // 發生錯誤時候
    })

4. Writable

可寫流可接受引數:

  • highWaterMark 快取區位元組大小,預設16384
  • decodeStrings 是否將字元編碼傳入緩衝區
  • objectMode 是否操作js其他型別 預設false
  • write 子類實現,供父類呼叫 實現寫入底層資料
  • writev 子類實現,供父類呼叫 一次處理多個chunk寫入底層資料
  • destroy 可以覆蓋父類方法,不能直接呼叫,銷燬流時,父類呼叫
  • final 完成寫入所有資料時父類觸發

在實現流除了用上面直接傳入引數的方式,還可以用繼承類

class WS extends stream.Writable {
    constructor() {
        super({
            highWaterMark: 1
        });
    }

    _write(chunk, encoding, cb) {
        console.log(this._writableState.length);
        // chunk 為需要寫入的資料
        // encoding 字元編碼
        // cb 回撥函式, 如果寫入成功需要呼叫cb去執行下一次寫入,如果發生錯誤,可以cb(new Error([錯誤資訊]))
        if (chunk.length < 4) {
            fs.writeFileSync('./2.text', chunk, {
                flag: 'a'
            });
            cb();
        } else{
            cb(new Error('超出4個位元組'));
        }
    }
}

const ws = new WS();

let i = 0;
function next() {
    let flag = true;

    // write() 會返回boolean false -> 快取區沒滿 true —> 已滿,需要暫停寫入資料
    while(i < 10 && flag) {
        flag = ws.write(`${i++}`);
        console.log('flag', flag);
    }
}

next();

// 當所有快取區資料已經成功寫入底層資料,快取區沒有資料了,觸發drain事件
ws.on('drain', () => {
    console.log('drain');
    // 繼續寫入快取區資料
    next();
})

可寫流的end事件,一旦觸發end事件,後續不能再寫入資料.

    ws.write('start');
    ws.end('end');
    ws.wrtie('test'); // 報錯 write after end

finish事件:

    ws.write('start');
    ws.end('end');
    ws.on('finish', () => {
        console.log('呼叫end方法後,並且所有資料已經寫入底層')
    })

cork()與uncork(),強制所有資料先寫入快取區,直到呼叫uncork()或end(),這時一併寫入底層:

    const ws = stream.Writable({
        writev(chunks, encoding, cb) {
            // 這時chunks為一個數組,包含所有的chunk
        
            // 現在length為10
            console.log(chunk.length);
        }
    });
    
    // 寫入資料之前,強制寫入資料放入快取區
    ws.cork();
    
    // 寫入資料
    for (let i = 0; i < 10; i++) {
        ws.write(i.toString());
    }
    
    // 寫入完畢,可以觸發寫入底層
    ws.uncork();

5. Duplex

讀寫流,該方法繼承了可寫流和可讀流,但相互之間沒有關係,各自獨立快取區,擁有Writable和Readable所有方法和事件,同時實現_read()和_write()方法。

    const fs = require('fs');
    const stream = require('stream');
    
    const duplex = stream.Duplex({
        write(chunk, encoding, cb) {
            console.log(chunk.toString('utf8')); // 寫入
        },
        read() {
            this.push('讀取');
            this.push(null);
        }
    });
    
    console.log(duplex.read(6).toString('utf8')); // 讀取
    
    duplex.write('寫入');

6. Transform

轉換流,這個流在前端工程化中用到最多,從一個地方讀取資料,轉換資料後輸出到一個地方,該流繼承於Duplex。

    const fs = require('fs');
    const stream = require('stream');
    
    const transform = stream.Transform({
        transform(chunk, encoding, cb){
            // 把資料轉換成大寫字母,然後push到快取區
            this.push(chunk.toString().toUpperCase());
            cb();
        }
    });
    
    transform.write('a');
    
    console.log(transform.read(1).toString()); // A

7. fs快速建立可讀/可寫流

可讀流和可寫流都需要我們去實現父類的方法,那麼fs這個模組幫我們做了這件事情,fs裡面實現了高效並且可靠的可讀/可寫流,提供快速建立流,不再去實現父類_write()或_read()。下面我們來看看如何使用:

    const fs = require('fs');
    
    /**
     * 建立可讀流
     *  
     *  第一個引數檔案路徑
     * 
     *  第二個引數為options
     * 
        flags?: string;      
        encoding?: string;   字元編碼
        fd?: number;     檔案開啟後的識別符號
        mode?: number;    檔案的許可權
        autoClose?: boolean;   讀取完畢後,是否自動關閉檔案
        start?: number;  從哪個位置開始讀取
        end?: number;    讀到什麼時候結束
        highWaterMark?: number;   最高水位線
     */
    const rs = fs.createReadStream('1.text');
    
    rs.on('data', data => {
        console.log(data);
    })
    
    /**
     * 建立可寫流
     *  
     *  第一個引數檔案路徑
     * 
     *  第二個引數為options
     * 
        flags?: string;  
        encoding?: string; 字元編碼
        fd?: number;    檔案開啟後的識別符號
        mode?: number;   檔案的許可權
        autoClose?: boolean;  寫入完畢後,是否自動關閉檔案
        start?: number;  從什麼位置開始寫入
     */
    const ws = fs.createWriteStream('2.text');
    
    ws.write('123');

8. pipe

在流中搭建一條管道,從可讀流中到可寫流。

可讀流中有pipe()方法,在可寫流中可以監聽pipe事件,下面實現了從可讀流中通過管道到可寫流:

    const fs = require('fs');
    const stream = require('stream');
    
    const rs = stream.Readable({
        read() {
            this.push(fs.readFileSync('./1.text')); // 檔案內容 test
            this.push(null);
        }
    });
    
    const ws = stream.Writable({
        write(chunk, encoding, cb) {
            // chunk為test buffer
            fs.writeFileSync('./2.text', chunk.toString());
            cb();
        }
    });
    
    ws.on('pipe', data => {
        // 觸發pipe事件
        console.log(data);
    });
    
    rs.pipe(ws);

9. 總結

流分為四種基本型別,兩種模式。流中的資料不是直接寫入或讀取,有快取區的概念。