1. 程式人生 > 實用技巧 >nodejs中的stream(流)

nodejs中的stream(流)

一:nodeJS中的stream(流)的概念及作用?

什麼是流呢?日常生活中有水流,我們很容易想得到的就是水龍頭,那麼水龍頭流出的水是有序且有方向的(從高處往低處流)。我們在nodejs中的流也是一樣的,他們也是有序且有方向的。nodejs中的流是可讀的、或可寫的、或可讀可寫的。
並且流繼承了EventEmitter。因此所有的流都是EventEmitter的實列。

Node.js中有四種基本的流型別,如下:

1. Readable--可讀的流(比如 fs.createReadStream()).
2. Writable--可寫的流(比如 fs.createWriteStream()).
3. Duplex--可讀寫的流


4. Transform---在讀寫過程中可以修改和變換資料的Duplex流。

nodeJS中的流最大的作用是:讀取大檔案的過程中,不會一次性的讀入到記憶體中。每次只會讀取資料來源的一個數據塊。
然後後續過程中可以立即處理該資料塊(資料處理完成後會進入垃圾回收機制)。而不用等待所有的資料。

我們先來看一個簡單的流的實列來理解下:

1. 首先我們來建立一個大檔案,如下程式碼:

const fs = require('fs');
const file = fs.createWriteStream('./big.txt');
// 迴圈500萬次
for (let i = 0; i <= 5000000; i++) {
  file.write('我是空智,我來測試一個大檔案, 你看看我會有多大?');
}

file.end();

我在我專案檔案裡面新建一個app.js檔案,然後把上面的程式碼放入到 app.js 裡面去,可以看到迴圈了500萬次後,寫入500萬次資料到 big.txt中去,因此會在檔案目錄下生成一個 big.txt檔案,如下:

該檔案在我磁碟中顯示345兆。

readFile讀取該檔案:

下面我們使用 readFile 來讀取該檔案看看(readFile會一次性讀入到記憶體中)。

我們把app.js程式碼改成如下:

const fs = require('fs');
const Koa = require('koa');

const app = new Koa();

app.use(async(ctx, next) => {
  const res = ctx.res;
  fs.readFile('./big.txt', (err, data) => {
    if (err) {
      throw err;
    } else {
      res.end(data);
    }
  })
});

app.listen(3001, () => {
  console.log('listening on 3001');
});

當我們執行node app.js 後,我們檢視下該程式碼佔用的記憶體(12MB)如下:

readFile 它會把 big.txt的檔案內容整個的讀進以Buffer格式存入到記憶體中,然後再寫進返回物件,那麼這樣的效率非常低的,並且如果該檔案如果是1G或2G以上的檔案,那麼記憶體會直接被卡死掉的。或者伺服器直接會奔潰掉。

下面我們使用 Node中的createReadStream方法就可以避免佔用記憶體多的情況發生。我們把app.js 程式碼改成如下所示:

const fs = require('fs');
const Koa = require('koa');

const app = new Koa();

app.use(async(ctx, next) => {
  const res = ctx.res;
  const file = fs.createReadStream('./big.txt');
  file.pipe(res);
});

app.listen(3001, () => {
  console.log('listening on 3001');
});

然後我們繼續檢視記憶體的使用情況,如下所示:

可以看到我們的佔用的記憶體只有12.8兆。也就是說:createReadStream 在讀取大檔案的過程中,不會一次性的讀入到記憶體中。
每次只會讀取資料來源的一個數據塊。這就是流的優點。下面我們來分別看下流吧。

二:fs.createReadStream() 可讀流

其基本使用方法如下:

const fs = require('fs');
const rs = fs.createReadStream('./big.txt', {
  flags: 'r', // 檔案的操作方式,同readFile中的配置一樣,這裡預設是可讀的是 r
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取檔案作業系統內部使用的檔案描述符
  start: 0, // 開始讀取的位置
  end: 5, // 結束讀取的位置
  highWaterMark: 1 // 每次讀取的個數
});

fs.createReadStream有以下監聽事件:
具體有哪些事件可以檢視官網(http://nodejs.cn/api/stream.html#stream_class_stream_readable)這邊先截圖出來簡單看看,如下所示:

有了上面這些監聽方法,我們可以先看一個完整的實列,如下程式碼:

const fs = require('fs');
const file = fs.createReadStream('./msg.txt', {
  flags: 'r', // 檔案的操作方式,同readFile中的配置一樣,這裡預設是可讀的是 r
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取檔案作業系統內部使用的檔案描述符
  start: 0, // 開始讀取的位置
  end: 5, // 結束讀取的位置
  highWaterMark: 1 // 每次讀取的個數
});

file.on('open', () => {
  console.log('開始讀取檔案');
});

file.on('data', (data) => {
  console.log('讀取到的資料:');
  console.log(data);
});

file.on('end', () => {
  console.log('檔案全部讀取完畢');
});

file.on('close', () => {
  console.log('檔案被關閉');
});

file.on('error', (err) => {
  console.log('讀取檔案失敗');
});

執行如下圖所示:

從上圖我們可以看到,先開啟檔案,執行open事件,然後就是不斷的觸發data事件,等data事情讀取結束後會觸發end事件,然後會將檔案關閉,觸發close事件。

注意:msg.txt檔案內容如下:hello world; 但是上面為什麼只讀了 hello了,那是因為我們上面限制了從開始讀取位置讀取,然後到結束位置結束(5). 並且限定了 highWaterMark: 1,每次讀取的個數為1。當然如果我們改成每次讀取的個數為2的話,那麼每次會讀2個字元。

pause() 方法:

如果我們在讀取的過程中,想暫停事件的讀取,我們可以使用 ReadStream物件的pause方法暫停data事件的觸發。 如下程式碼:

file.on('data', (data) => {
  console.log('讀取到的資料:');
  console.log(data);
  file.pause();
});

然後如下圖所示:

上面暫停了使用 pause()方法,如果我們現在想重新讀取,需要使用 resume()方法,如下所示:

setTimeout(() => {
  file.resume();
}, 100);

執行結果如下:

其他的一些事件,比如 readable事件等,可以看官方文件 (http://nodejs.cn/api/stream.html#stream_event_readable). 這裡就不多分析了。

三:fs.createWriteStream() 可寫流

如下程式碼演示:

const fs = require('fs');
const file = fs.createWriteStream('./1.txt', {
  flags: 'w', // 檔案的操作方式,同writeFile中的配置一樣,這裡預設是可讀的是 w
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取檔案作業系統內部使用的檔案描述符
  start: 0, // 開始讀取的位置
  highWaterMark: 1 // 每次寫入的個數
});

let f1 = file.write('1', 'utf-8', () => {
  console.log('寫入成功1111');
});

f1 = file.write('2', 'utf-8', () => {
  console.log('寫入成功2222');
});

f1 = file.write('3', 'utf-8', () => {
  console.log('寫入成功3333');
});

// 標記檔案末尾
file.end();

// 處理事件
file.on('finish', () => {
  console.log('寫入完成');
});

file.on('error', (err) => {
  console.log(err);
});

在我專案的根目錄下會生成一個 1.txt檔案,裡面有123內容。

詳細請看官網(http://nodejs.cn/api/fs.html#fs_fs_writefile_file_data_options_callback

管道流(pipe)

我們需要把我們上面可讀流讀到的資料需要放到可寫流中去寫入到檔案裡面去。我們可以如下操作程式碼:

const fs = require('fs');

// 讀取msg.txt中的字串 hello world
const msg = fs.createReadStream('./msg.txt', {
  highWaterMark: 5
});

// 寫入到1.txt中
const f1 = fs.createWriteStream('./1.txt', {
  encoding: 'utf-8',
  highWaterMark: 1
});

// 監聽讀取的資料過程,把讀取的資料寫入到我們的1.txt檔案裡面去
msg.on('data', (chunk) => {
  f1.write(chunk, 'utf-8', () => {
    console.log('寫入成功');
  });
});

但是實現如上的機制,我們可以使用管道機制,管道提供了一個輸出流到輸入流的機制。通常我們用於從一個流中獲取資料並將資料傳遞到另外一個流中。如下圖所示:

const fs = require('fs');

// 讀取msg.txt中的字串 hello world
const msg = fs.createReadStream('./msg.txt', {
  highWaterMark: 5
});

// 寫入到1.txt中
const f1 = fs.createWriteStream('./1.txt', {
  encoding: 'utf-8',
  highWaterMark: 1
});

const res = msg.pipe(f1);
console.log(res);