1. 程式人生 > >Node.js 真·多執行緒 Worker Threads 初探

Node.js 真·多執行緒 Worker Threads 初探

基本資訊

筆者在 Node.js 最新的開發版本 v11.4.0 上測試該特性,目前需要新增 flag 才能引入 Worker Threads,例如:

node --experimental-worker index.js
複製程式碼

Worker Threads 特性是在2018年6月20日的 v10.5.0 版本引入的:

node/CHANGELOG

目前該模組處於 Stability 1 - Experimental 階段,改動會較大,不建議用於生產環境。

模組介面

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require
('worker_threads'); 複製程式碼

該模組物件和類非常少,只有4個物件和3個類。

  • isMainThread:false 表示當前為 worker 執行緒,false 表示為主執行緒
  • parentPort: 在 worker 執行緒裡是表示父程序的 MessagePort 型別的物件,在主執行緒裡為 null
  • workerData: 在 worker 執行緒裡是父程序建立 worker 執行緒時的初始化資料,在主執行緒裡是 undefined
  • threadId: 在 worker 執行緒裡是執行緒 ID,在父程序裡是 0
  • MessageChannel: 包含兩個已經互相能夠誇執行緒通訊的 MessagePort
    型別物件,可用於建立自定義的通訊頻道,可參考樣例二的實現。
  • MessagePort: 用於跨執行緒通訊的控制代碼,繼承了 EventEmitter,包括 close message 事件用於接收物件關閉和傳送的訊息,以及 close postMessage 等操作。
  • Worker: 主執行緒用於建立 worker 執行緒的物件型別,包含所有的 MessagePort 操作以及一些特有的子執行緒 meta data 操作。建構函式的第一個引數是子執行緒執行的入口指令碼程式,第二個引數包含一些配置項,可以指定一些初始引數。 詳細內容見文件

記憶體模型的變更

在使用 cluster

child_process 時通常使用 SharedArrayBuffer 來實現需要多程序共享的記憶體。

port.postMessage(value[, transferList])
複製程式碼

現在 Worker Threads 模組在 API 層不建議多執行緒共享記憶體,第一個引數 value 的值會被 clone 一份在接受訊息的執行緒。transferList 只能傳遞 ArrayBuffer 或者 MessagePort 物件,傳遞 ArrayBuffer 會修改該 Buffer 的訪問許可權給接受訊息的執行緒,傳遞 MessagePort 可以參考樣例二。

所有跨執行緒訊息的通訊都通過走底層的 v8 序列化實現,更具體的 Worker Threads 和 v8 多執行緒模型以及和瀏覽器的 Web Worker 標準的關係暫不展開。

樣例一:主執行緒和 worker 執行緒通訊計數,計數到 5 後 worker 執行緒自殺

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require('worker_threads');

function mainThread() {
  const worker = new Worker(__filename, { workerData: 0 });
  worker.on('exit', code => { console.log(`main: worker stopped with exit code ${code}`); });
  worker.on('message', msg => {
    console.log(`main: receive ${msg}`);
    worker.postMessage(msg + 1);
  });
}

function workerThread() {
  console.log(`worker: threadId ${threadId} start with ${__filename}`);
  console.log(`worker: workerDate ${workerData}`);
  parentPort.on('message', msg => {
    console.log(`worker: receive ${msg}`);
    if (msg === 5) { process.exit(); }
    parentPort.postMessage(msg);
  }),
  parentPort.postMessage(workerData);
}

if (isMainThread) {
  mainThread();
} else {
  workerThread();
}
複製程式碼

輸出結果:

worker: threadId 1 start with /Users/azard/test/index.js
worker: workerDate 0
main: receive 0
worker: receive 1
main: receive 1
worker: receive 2
main: receive 2
worker: receive 3
main: receive 3
worker: receive 4
main: receive 4
worker: receive 5
main: receive 5
main: worker stopped with exit code 0
複製程式碼

樣例二:使用 MessageChannel 讓兩個子執行緒直接通訊

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require('worker_threads');

if (isMainThread) {
  const worker1 = new Worker(__filename);
  const worker2 = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker1.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  worker2.postMessage({ hereIsYourPort: subChannel.port2 }, [subChannel.port2]);
} else {
  parentPort.once('message', (value) => {
    value.hereIsYourPort.postMessage('hello');
    value.hereIsYourPort.on('message', msg => {
      console.log(`thread ${threadId}: receive ${msg}`);
    });
  });
}
複製程式碼

輸出:

thread 2: receive hello
thread 1: receive hello
複製程式碼

總結

現在 Node.js 有了真多執行緒,不需要再用 cluster 或者 child_process 的多程序來處理一些問題了,相關的框架、庫的模型在 Worker Threads 穩定後也可以開始進行迭代更新。