極簡 Node.js 入門 - 4.4 可寫流
阿新 • • 發佈:2020-09-29
> 極簡 Node.js 入門系列教程:[https://www.yuque.com/sunluyong/node](https://www.yuque.com/sunluyong/node)
>
> 本文更佳閱讀體驗:[https://www.yuque.com/sunluyong/node/writable](https://www.yuque.com/sunluyong/node/writable)
## 什麼是可寫流
可寫流是對資料流向裝置的抽象,用來消費上游流過來的資料,通過可寫流程式可以把資料寫入裝置,常見的是本地磁碟檔案或者 TCP、HTTP 等網路響應。看一個之前用過的例子
```javascript
process.stdin.pipe(process.stdout);
```
_**process.stdout**_ 是一個可寫流,程式把可讀流 process.stdin 傳過來的資料寫入的標準輸出裝置。在瞭解了可讀流的基礎上理解可寫流非常簡單,流就是有方向的資料,其中可讀流是資料來源,可寫流是目的地,中間的管道環節是雙向流。
## 可寫流使用
呼叫可寫流例項的 **_write() _**方法就可以把資料寫入可寫流
```javascript
const fs = require('fs');
const rs = fs.createReadStream('./w.js');
const ws = fs.createWriteStream('./copy.js');
rs.setEncoding('utf-8');
rs.on('data', chunk => {
ws.write(chunk);
});
```
前面提到過監聽了可讀流的 data 事件就會使可讀流進入流動模式,我們在回撥事件裡呼叫了可寫流的 write() 方法,這樣資料就被寫入了可寫流抽象的裝置中,也就是當前目錄下的 copy.js 檔案
write() 方法有三個引數 - **chunk** {String| Buffer},表示要寫入的資料 - **encoding** 當寫入的資料是字串的時候可以設定編碼 - **callback** 資料被寫入之後的回撥函式 ## 自定義可寫流 和自定義可讀流類似,簡單的自定義可寫流只需要兩步 1. 繼承 stream 模組的 **Writable** 類 1. 實現 **_write()** 方法
用個簡單例子演示可寫流實現,把傳入可寫流的資料轉成大寫之後輸出到標準輸出裝置 stdout ```javascript const Writable = require('stream').Writable class OutputStream extends Writable { _write(chunk, enc, done) { // 轉大寫之後寫入標準輸出裝置 process.stdout.write(chunk.toString().toUpperCase()); // 此處不嚴謹,應該是監聽寫完之後才呼叫 done process.nextTick(done); } } module.exports = OutputStream; ``` 和最終可寫流暴露出來的 write() 方法一樣, _write() 方法有三個引數,作用類似 - **chunk** 寫入的資料,大部分時候是 buffer,除非 decodeStrings 被設定為 false - **encoding** 如果資料是字串,可以設定編碼,buffer 或者 object 模式會忽略 - **callback** 資料寫入後的回撥函式,可以通知流傳入下一個資料;當出現錯誤的時候也可以設定一個 error 引數
除了在流實現中的 _write() 之外,還可以實現 _writev() 方法,一次處理多個數據塊,這個方法用於被滯留的資料寫入佇列呼叫,可以不實現
## 例項化可寫流 options
有了可寫流的類之後可以例項化使用了,例項化可寫流的時候有幾個 option 可選,瞭解一下接下來要用到的三個核心 options
- **objectMode** 預設是 false, 設定成 true 後 writable.write() 方法除了寫入 string 和 buffer 外,還可以寫入任意 JavaScript 物件。很有用的一個選項,後面介紹 transform 流的時候詳細介紹
- **highWaterMark** 每次最多寫入的資料量, Buffer 的時候預設值 16kb, objectMode 時預設值 16
- **decodeStrings** 是否把傳入的資料轉成 Buffer,預設是 true
這樣就更清楚的知道 _write() 方法傳入的引數的含義了,而且對後面介紹 back pressure 機制的理解很有幫助。
## 事件
和可讀流一樣,可寫流也有幾個常用的事件,有了可讀流的基礎,理解起來比較簡單
`**pipe**` 當可讀流呼叫 pipe() 方法向可寫流傳輸資料的時候會觸發可寫流的 pipe 事件
`**unpipe**` 當可讀流呼叫 unpipe() 方法移除資料傳遞的時候會觸發可寫流的 unpipe 事件
這兩個事件用於通知可寫流資料將要到來和將要被切斷,在通常情況下使用的很少
_writeable.write()_ 方法是有一個 bool 的返回值的,前面提到了 **_highWaterMark_**,當要求寫入的資料大於可寫流的 highWaterMark 的時候,資料不會被一次寫入,有一部分資料被滯留,這時候 writeable.write() 就會返回 **false**,如果可以處理完就會返回 true
`**drain**` 當之前存在滯留資料,也就是 writeable.write() 返回過 false,經過一段時間的消化,處理完了積壓資料,可以繼續寫入新資料的時候觸發(drain 的本意即為排水、枯竭,挺形象的)
除了 write() 方法可寫流還有一個常用的方法 end(),引數和 write() 方法相同,但也可以不傳入引數,表示沒有其它資料需要寫入,可寫流可以關閉了
`**finish**` 當呼叫 writable.end() 方法,並且所有資料都被寫入底層後會觸發 finish 事件,同樣出現錯誤後會觸發 **`error`** ** **事件 ## back pressure 瞭解了這些事件,結合上之前提到的可讀流的一些知識,就能探討一些有意思的話題了。前面章節提到過用流相對於直接操作檔案的好處之一是不會把記憶體壓爆,那麼流是怎麼做到的呢?
很容易聯想到流不是一次性把所有資料載入記憶體處理,而是一邊讀一邊寫。但一般資料讀取的速度會遠遠快於寫入的速度,那麼 pipe() 方法是怎麼做到供需平衡的呢?主要靠以下三個要點 1. 可讀流有流動和暫停兩種模式,可以通過 **pause() **和** resume() **方法切換 1. 可寫流的 **write() **方法會返回是否能處理當前的資料,每次可以處理多少是 **highWatermark** 決定的 1. 當可寫流處理完了積壓資料會觸發 **drain** 事件
可以利用這三點來做到資料讀取和寫入的同步,還是使用之前的例子,但為了使消費速度降下來,刻意隔一秒再通知完成 ```javascript class OutputStream extends Writable { _write(chunk, enc, done) { // 轉大寫之後寫入標準輸出裝置 process.stdout.write(chunk.toString().toUpperCase()); // 故意延緩通知繼續傳遞資料的時間,造成寫入速度慢的現象 setTimeout(done, 1000); } } ``` 使用一下自定義的兩個類 ```javascript const RandomNumberStream = require('./RandomNumberStream'); const OutputStream = require('./OutputStream'); const rns = new RandomNumberStream(100); const os = new OutputStream({ highWaterMark: 8 // 把水位降低,預設16k還是挺大的 }); rns.on('data', chunk => { // 當待處理佇列大於 highWaterMark 時返回 false if (os.write(chunk) === false) { console.log('pause'); rns.pause(); // 暫停資料讀取 } }); // 當待處理佇列小於 highWaterMark 時觸發 drain 事件 os.on('drain', () => { console.log('drain') rns.resume(); // 恢復資料讀取 }); ``` 結合前面的三點和註釋很容易看懂上面程式碼,這就是 pipe() 方法起作用的核心原理,官方教程中也有對 [back presure 機制](https://nodejs.org/zh-cn/docs/guides/backpressuring-in-streams/)的詳細講解
對資料的來源的去向有了大概瞭解,就可以學習使用雙向流對資料進行加工了 - duplex - tr
write() 方法有三個引數 - **chunk** {String| Buffer},表示要寫入的資料 - **encoding** 當寫入的資料是字串的時候可以設定編碼 - **callback** 資料被寫入之後的回撥函式 ## 自定義可寫流 和自定義可讀流類似,簡單的自定義可寫流只需要兩步 1. 繼承 stream 模組的 **Writable** 類 1. 實現 **_write()** 方法
用個簡單例子演示可寫流實現,把傳入可寫流的資料轉成大寫之後輸出到標準輸出裝置 stdout ```javascript const Writable = require('stream').Writable class OutputStream extends Writable { _write(chunk, enc, done) { // 轉大寫之後寫入標準輸出裝置 process.stdout.write(chunk.toString().toUpperCase()); // 此處不嚴謹,應該是監聽寫完之後才呼叫 done process.nextTick(done); } } module.exports = OutputStream; ``` 和最終可寫流暴露出來的 write() 方法一樣, _write() 方法有三個引數,作用類似 - **chunk** 寫入的資料,大部分時候是 buffer,除非 decodeStrings 被設定為 false - **encoding** 如果資料是字串,可以設定編碼,buffer 或者 object 模式會忽略 - **callback** 資料寫入後的回撥函式,可以通知流傳入下一個資料;當出現錯誤的時候也可以設定一個 error 引數
`**pipe**` 當可讀流呼叫 pipe() 方法向可寫流傳輸資料的時候會觸發可寫流的 pipe 事件
`**unpipe**` 當可讀流呼叫 unpipe() 方法移除資料傳遞的時候會觸發可寫流的 unpipe 事件
這兩個事件用於通知可寫流資料將要到來和將要被切斷,在通常情況下使用的很少
_writeable.write()_ 方法是有一個 bool 的返回值的,前面提到了 **_highWaterMark_**,當要求寫入的資料大於可寫流的 highWaterMark 的時候,資料不會被一次寫入,有一部分資料被滯留,這時候 writeable.write() 就會返回 **false**,如果可以處理完就會返回 true
`**drain**` 當之前存在滯留資料,也就是 writeable.write() 返回過 false,經過一段時間的消化,處理完了積壓資料,可以繼續寫入新資料的時候觸發(drain 的本意即為排水、枯竭,挺形象的)
除了 write() 方法可寫流還有一個常用的方法 end(),引數和 write() 方法相同,但也可以不傳入引數,表示沒有其它資料需要寫入,可寫流可以關閉了
`**finish**` 當呼叫 writable.end() 方法,並且所有資料都被寫入底層後會觸發 finish 事件,同樣出現錯誤後會觸發 **`error`** ** **事件 ## back pressure 瞭解了這些事件,結合上之前提到的可讀流的一些知識,就能探討一些有意思的話題了。前面章節提到過用流相對於直接操作檔案的好處之一是不會把記憶體壓爆,那麼流是怎麼做到的呢?
很容易聯想到流不是一次性把所有資料載入記憶體處理,而是一邊讀一邊寫。但一般資料讀取的速度會遠遠快於寫入的速度,那麼 pipe() 方法是怎麼做到供需平衡的呢?主要靠以下三個要點 1. 可讀流有流動和暫停兩種模式,可以通過 **pause() **和** resume() **方法切換 1. 可寫流的 **write() **方法會返回是否能處理當前的資料,每次可以處理多少是 **highWatermark** 決定的 1. 當可寫流處理完了積壓資料會觸發 **drain** 事件
可以利用這三點來做到資料讀取和寫入的同步,還是使用之前的例子,但為了使消費速度降下來,刻意隔一秒再通知完成 ```javascript class OutputStream extends Writable { _write(chunk, enc, done) { // 轉大寫之後寫入標準輸出裝置 process.stdout.write(chunk.toString().toUpperCase()); // 故意延緩通知繼續傳遞資料的時間,造成寫入速度慢的現象 setTimeout(done, 1000); } } ``` 使用一下自定義的兩個類 ```javascript const RandomNumberStream = require('./RandomNumberStream'); const OutputStream = require('./OutputStream'); const rns = new RandomNumberStream(100); const os = new OutputStream({ highWaterMark: 8 // 把水位降低,預設16k還是挺大的 }); rns.on('data', chunk => { // 當待處理佇列大於 highWaterMark 時返回 false if (os.write(chunk) === false) { console.log('pause'); rns.pause(); // 暫停資料讀取 } }); // 當待處理佇列小於 highWaterMark 時觸發 drain 事件 os.on('drain', () => { console.log('drain') rns.resume(); // 恢復資料讀取 }); ``` 結合前面的三點和註釋很容易看懂上面程式碼,這就是 pipe() 方法起作用的核心原理,官方教程中也有對 [back presure 機制](https://nodejs.org/zh-cn/docs/guides/backpressuring-in-streams/)的詳細講解
對資料的來源的去向有了大概瞭解,就可以學習使用雙向流對資料進行加工了 - duplex - tr