1. 程式人生 > 其它 >Node.js 多執行緒——worker_threads

Node.js 多執行緒——worker_threads

Node.js 是如何工作的

Node.js 使用兩種執行緒:event loop處理的主執行緒和worker pool中的幾個輔助執行緒。

事件迴圈是一種機制,它採用回撥(函式)並註冊它們,準備在將來的某個時刻執行。它與相關的 JavaScript 程式碼在同一個執行緒中執行。當 JavaScript 操作阻塞執行緒時,事件迴圈也會被阻止。

工作池是一種執行模型,它產生並處理單獨的執行緒,然後同步執行任務,並將結果返回到事件迴圈。事件迴圈使用返回的結果執行提供的回撥。

簡而言之,它負責非同步 I/O操作 —— 主要是與系統磁碟和網路的互動。它主要由諸如fs(I/O 密集)或crypto(CPU 密集)等模組使用。工作池用

libuv實現,當 Node 需要在 JavaScript 和 C++ 之間進行內部通訊時,會導致輕微的延遲,但這幾乎不可察覺。

基於這兩種機制,我們可以編寫如下程式碼:

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }

 console.log(content.toString());
});

前面提到的fs模組告訴工作池使用其中一個執行緒來讀取檔案的內容,並在完成後通知事件迴圈。然後事件迴圈獲取提供的回撥函式,並用檔案的內容執行它。

以上是非阻塞程式碼的示例,我們不必同步等待某事的發生。只需告訴工作池去讀取檔案,並用結果去呼叫提供的函式即可。由於工作池有自己的執行緒,因此事件迴圈可以在讀取檔案時繼續正常執行。

在不需要同步執行某些複雜操作時,這一切都相安無事:任何執行時間太長的函式都會阻塞執行緒。如果應用程式中有大量這類功能,就可能會明顯降低伺服器的吞吐量,甚至完全凍結它。在這種情況下,無法繼續將工作委派給工作池。

在需要對資料進行復雜的計算時(如AI、機器學習或大資料)無法真正有效地使用 Node.js,因為操作阻塞了主(且唯一)執行緒,使伺服器無響應。在 Node.js v10.5.0 釋出之前就是這種情況,在這一版本增加了對多執行緒的支援。

worker_threads

worker_threads模組允許我們建立功能齊全的多執行緒 Node.js 程式。

thread worker 是在單獨的執行緒中生成的一段程式碼(通常從檔案中取出)。

注意,術語thread workerworkerthread經常互換使用,他們都指的是同一件事。

要想使用 thread worker,必須匯入worker_threads模組。讓我們先寫一個函式來幫助我們生成這些thread worker,然後再討論它們的屬性。

type WorkerCallback = (err: any, result?: any) => any;

export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });

 worker.on('message', cb.bind(null, null));
 worker.on('error', cb);

 worker.on('exit', (exitCode) => {
   if (exitCode === 0) {
     return null;
   }

   return cb(new Error(`Worker has stopped with code ${exitCode}`));
 });

 return worker;
}

要建立一個 worker,首先必須建立一個Worker類的例項。它的第一個引數提供了包含 worker 的程式碼的檔案的路徑;第二個引數提供了一個名為workerData的包含一個屬性的物件。這是我們希望執行緒在開始執行時可以訪問的資料。

請注意:不管你是用的是 JavaScript, 還是最終要轉換為 JavaScript 的語言(例如,TypeScript),路徑應該始終引用帶有.js.mjs副檔名的檔案。

我還想指出為什麼使用回撥方法,而不是返回在觸發message事件時將解決的 promise。這是因為 worker 可以傳送許多message事件,而不是一個。

正如你在上面的例子中所看到的,執行緒間的通訊是基於事件的,這意味著我們設定了 worker 在傳送給定事件後呼叫的偵聽器。

以下是最常見的事件:

worker.on('error', (error) => {});

只要 worker 中有未捕獲的異常,就會發出error事件。然後終止 worker,錯誤可以作為提供的回撥中的第一個引數。

worker.on('exit', (exitCode) => {});

在 worker 退出時會發出exit事件。如果在worker中呼叫了process.exit(),那麼exitCode將被提供給回撥。如果 worker 以worker.terminate()終止,則程式碼為1。

worker.on('online', () => {});

只要 worker 停止解析 JavaScript 程式碼並開始執行,就會發出online事件。它不常用,但在特定情況下可以提供資訊。

worker.on('message', (data) => {});

只要 worker 將資料傳送到父執行緒,就會發出message事件。

現在讓我們來看看如何線上程之間共享資料。

線上程之間交換資料

要將資料傳送到另一個執行緒,可以用port.postMessage()方法。它的原型如下:

port.postMessage(data[, transferList])

port 物件可以是parentPort,也可以是MessagePort的例項 —— 稍後會詳細講解。

資料引數

第一個引數 —— 這裡被稱為data—— 是一個被複制到另一個執行緒的物件。它可以是複製演算法所支援的任何內容。

資料由結構化克隆演算法進行復制(包含function的物件引用都會報錯DataCloneError:xxxx could not be cloned)。引用自 Mozilla:

它通過遞迴輸入物件來進行克隆,同時保持之前訪問過的引用的對映,以避免無限遍歷迴圈。

該演算法不復制函式、錯誤、屬性描述符或原型鏈。還需要注意的是,以這種方式複製物件與使用 JSON 不同,因為它可以包含迴圈引用和型別化陣列,而 JSON 不能。

由於能夠複製型別化陣列,該演算法可以線上程之間共享記憶體。

例項:

1、程式碼

server.js

const express = require('express');
const ws = require('ws');
const convertMessage = require('./worker');//引入worker中的方法

const app = express()
const wsServer = new ws.Server({ noServer: true });
wsServer.on('connection', (socket, req) => {
    socket.on('message', message => {
        console.log(message);
    });
});
const port = 3002
app.get('/test', (req, res) => {
    //1.接收到test請求,呼叫convertMessage,發起子執行緒
    convertMessage().then(() => {
//5.converMessage resove後向客戶端傳送success res.send(
'success') }) }) app.get('/', async (req, res) => { res.send('Hello World!') }) //啟動服務 const server = app.listen(port, () => { console.log(`Example app listening at http://localhost:${port}`) }) server.on('upgrade', (request, socket, head) => { wsServer.handleUpgrade(request, socket, head, socket => { wsServer.emit('connection', socket, request); }); });

worker.js

const { Worker, workerData } = require('worker_threads')

module.exports = function convertMessage() {
    return new Promise((resolve, reject) => {
        //2.子執行緒中執行./index.js檔案
        const worker = new Worker('./index.js');
        worker.on('message', (message) => {
            //4.接收到子執行緒通過postMessage傳回的message
            console.log(message)
            resolve()
        });
        worker.on('error', reject);
        worker.on('exit', (code) => {
            //子執行緒執行完成後觸發exit事件
            if (code !== 0) {
                reject(new Error(`Worker stopped with exit code ${code}`));
            }
        })
    })
}

index.js

const { parentPort, workerData } = require('worker_threads')

for (let i = 0; i < 100; i++) {    
    //3.通過主執行緒的parentPort,向主執行緒傳送訊息
    parentPort.postMessage(`index.js執行中${i}`)
}

2、測試結果

後端通過node server.js啟動服務,前端通過http://localhost:3002/test發起請求:

後端log如下:

前端結果: