Node.js 多執行緒——worker_threads
Node.js 是如何工作的
Node.js 使用兩種執行緒:event loop處理的主執行緒和worker pool中的幾個輔助執行緒。
事件迴圈是一種機制,它採用回撥(函式)並註冊它們,準備在將來的某個時刻執行。它與相關的 JavaScript 程式碼在同一個執行緒中執行。當 JavaScript 操作阻塞執行緒時,事件迴圈也會被阻止。
工作池是一種執行模型,它產生並處理單獨的執行緒,然後同步執行任務,並將結果返回到事件迴圈。事件迴圈使用返回的結果執行提供的回撥。
簡而言之,它負責非同步 I/O操作 —— 主要是與系統磁碟和網路的互動。它主要由諸如fs
(I/O 密集)或crypto
(CPU 密集)等模組使用。工作池用
基於這兩種機制,我們可以編寫如下程式碼:
fs.readFile(path.join(__dirname, './package.json'), (err, content) => { if (err) { return null; } console.log(content.toString()); });
前面提到的fs
模組告訴工作池使用其中一個執行緒來讀取檔案的內容,並在完成後通知事件迴圈。然後事件迴圈獲取提供的回撥函式,並用檔案的內容執行它。
以上是非阻塞程式碼的示例,我們不必同步等待某事的發生。只需告訴工作池去讀取檔案,並用結果去呼叫提供的函式即可。由於工作池有自己的執行緒,因此事件迴圈可以在讀取檔案時繼續正常執行。
在不需要同步執行某些複雜操作時,這一切都相安無事:任何執行時間太長的函式都會阻塞執行緒。如果應用程式中有大量這類功能,就可能會明顯降低伺服器的吞吐量,甚至完全凍結它。在這種情況下,無法繼續將工作委派給工作池。
在需要對資料進行復雜的計算時(如AI、機器學習或大資料)無法真正有效地使用 Node.js,因為操作阻塞了主(且唯一)執行緒,使伺服器無響應。在 Node.js v10.5.0 釋出之前就是這種情況,在這一版本增加了對多執行緒的支援。
worker_threads
worker_threads
模組允許我們建立功能齊全的多執行緒 Node.js 程式。
thread worker 是在單獨的執行緒中生成的一段程式碼(通常從檔案中取出)。
注意,術語thread worker,worker和thread經常互換使用,他們都指的是同一件事。
要想使用 thread worker,必須匯入worker_threads
模組。讓我們先寫一個函式來幫助我們生成這些thread worker,然後再討論它們的屬性。
type WorkerCallback = (err: any, result?: any) => any; export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) { const worker = new Worker(path, { workerData }); worker.on('message', cb.bind(null, null)); worker.on('error', cb); worker.on('exit', (exitCode) => { if (exitCode === 0) { return null; } return cb(new Error(`Worker has stopped with code ${exitCode}`)); }); return worker; }
要建立一個 worker,首先必須建立一個Worker
類的例項。它的第一個引數提供了包含 worker 的程式碼的檔案的路徑;第二個引數提供了一個名為workerData
的包含一個屬性的物件。這是我們希望執行緒在開始執行時可以訪問的資料。
請注意:不管你是用的是 JavaScript, 還是最終要轉換為 JavaScript 的語言(例如,TypeScript),路徑應該始終引用帶有.js
或.mjs
副檔名的檔案。
我還想指出為什麼使用回撥方法,而不是返回在觸發message
事件時將解決的 promise。這是因為 worker 可以傳送許多message
事件,而不是一個。
正如你在上面的例子中所看到的,執行緒間的通訊是基於事件的,這意味著我們設定了 worker 在傳送給定事件後呼叫的偵聽器。
以下是最常見的事件:
worker.on('error', (error) => {});
只要 worker 中有未捕獲的異常,就會發出error
事件。然後終止 worker,錯誤可以作為提供的回撥中的第一個引數。
worker.on('exit', (exitCode) => {});
在 worker 退出時會發出exit
事件。如果在worker中呼叫了process.exit()
,那麼exitCode
將被提供給回撥。如果 worker 以worker.terminate()
終止,則程式碼為1。
worker.on('online', () => {});
只要 worker 停止解析 JavaScript 程式碼並開始執行,就會發出online
事件。它不常用,但在特定情況下可以提供資訊。
worker.on('message', (data) => {});
只要 worker 將資料傳送到父執行緒,就會發出message
事件。
現在讓我們來看看如何線上程之間共享資料。
線上程之間交換資料
要將資料傳送到另一個執行緒,可以用port.postMessage()
方法。它的原型如下:
port.postMessage(data[, transferList])
port 物件可以是parentPort
,也可以是MessagePort
的例項 —— 稍後會詳細講解。
資料引數
第一個引數 —— 這裡被稱為data
—— 是一個被複制到另一個執行緒的物件。它可以是複製演算法所支援的任何內容。
資料由結構化克隆演算法進行復制(包含function的物件引用都會報錯DataCloneError:xxxx could not be cloned)。引用自 Mozilla:
它通過遞迴輸入物件來進行克隆,同時保持之前訪問過的引用的對映,以避免無限遍歷迴圈。
該演算法不復制函式、錯誤、屬性描述符或原型鏈。還需要注意的是,以這種方式複製物件與使用 JSON 不同,因為它可以包含迴圈引用和型別化陣列,而 JSON 不能。
由於能夠複製型別化陣列,該演算法可以線上程之間共享記憶體。
例項:
1、程式碼
server.js
const express = require('express'); const ws = require('ws'); const convertMessage = require('./worker');//引入worker中的方法 const app = express() const wsServer = new ws.Server({ noServer: true }); wsServer.on('connection', (socket, req) => { socket.on('message', message => { console.log(message); }); }); const port = 3002 app.get('/test', (req, res) => { //1.接收到test請求,呼叫convertMessage,發起子執行緒 convertMessage().then(() => {
//5.converMessage resove後向客戶端傳送success res.send('success') }) }) app.get('/', async (req, res) => { res.send('Hello World!') }) //啟動服務 const server = app.listen(port, () => { console.log(`Example app listening at http://localhost:${port}`) }) server.on('upgrade', (request, socket, head) => { wsServer.handleUpgrade(request, socket, head, socket => { wsServer.emit('connection', socket, request); }); });
worker.js
const { Worker, workerData } = require('worker_threads') module.exports = function convertMessage() { return new Promise((resolve, reject) => { //2.子執行緒中執行./index.js檔案 const worker = new Worker('./index.js'); worker.on('message', (message) => { //4.接收到子執行緒通過postMessage傳回的message console.log(message) resolve() }); worker.on('error', reject); worker.on('exit', (code) => { //子執行緒執行完成後觸發exit事件 if (code !== 0) { reject(new Error(`Worker stopped with exit code ${code}`)); } }) }) }
index.js
const { parentPort, workerData } = require('worker_threads') for (let i = 0; i < 100; i++) { //3.通過主執行緒的parentPort,向主執行緒傳送訊息 parentPort.postMessage(`index.js執行中${i}`) }
2、測試結果
後端通過node server.js啟動服務,前端通過http://localhost:3002/test發起請求:
後端log如下:
前端結果: