1. 程式人生 > 程式設計 >nodejs中使用worker_threads來建立新的執行緒的方法

nodejs中使用worker_threads來建立新的執行緒的方法

簡介

之前的文章中提到了,nodejs中有兩種執行緒,一種是event loop用來相應使用者的請求和處理各種callback。另一種就是worker pool用來處理各種耗時操作。

nodejs的官網提到了一個能夠使用nodejs本地woker pool的lib叫做webworker-threads。

可惜的是webworker-threads的最後一次更新還是在2年前,而在最新的nodejs 12中,根本無法使用。

而webworker-threads的作者則推薦了一個新的lib叫做web-worker。

web-worker是構建於nodejs的worker_threads之上的,本文將會詳細講解worker_threads和web-worker的使用。

worker_threads

worker_threads模組的原始碼源自lib/worker_threads.js,它指的是工作執行緒,可以開啟一個新的執行緒來並行執行javascript程式。

worker_threads主要用來處理CPU密集型操作,而不是IO操作,因為nodejs本身的非同步IO已經非常強大了。

worker_threads中主要有5個屬性,3個class和3個主要的方法。接下來我們將會一一講解。

isMainThread

isMainThread用來判斷程式碼是否在主執行緒中執行,我們看一個使用的例子:

const { Worker,isMainThread } = require('worker_threads');

if (isMainThread) {
 console.log('在主執行緒中');
 new Worker(__filename);
} else {
 console.log('在工作執行緒中');
 console.log(isMainThread); // 列印 'false'。
}

上面的例子中,我們從worker_threads模組中引入了Worker和isMainThread,Worker就是工作執行緒的主類,我們將會在後面詳細講解,這裡我們使用Worker建立了一個工作執行緒。

MessageChannel

MessageChannel代表的是一個非同步雙向通訊channel。MessageChannel中沒有方法,主要通過MessageChannel來連線兩端的MessagePort。

class MessageChannel {
  readonly port1: MessagePort;
  readonly port2: MessagePort;
 }

當我們使用new MessageChannel()的時候,會自動建立兩個MessagePort。

const { MessageChannel } = require('worker_threads');

const { port1,port2 } = new MessageChannel();
port1.on('message',(message) => console.log('received',message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener

通過MessageChannel,我們可以進行MessagePort間的通訊。

parentPort和MessagePort

parentPort是一個MessagePort型別,parentPort主要用於worker執行緒和主執行緒進行訊息互動。

通過parentPort.postMessage()傳送的訊息在主執行緒中將可以通過worker.on(‘message')接收。

主執行緒中通過worker.postMessage()傳送的訊息將可以在工作執行緒中通過parentPort.on(‘message')接收。

我們看一下MessagePort的定義:

class MessagePort extends EventEmitter {
  close(): void;
  postMessage(value: any,transferList?: Array<ArrayBuffer | MessagePort>): void;
  ref(): void;
  unref(): void;
  start(): void;

  addListener(event: "close",listener: () => void): this;
  addListener(event: "message",listener: (value: any) => void): this;
  addListener(event: string | symbol,listener: (...args: any[]) => void): this;

  emit(event: "close"): boolean;
  emit(event: "message",value: any): boolean;
  emit(event: string | symbol,...args: any[]): boolean;

  on(event: "close",listener: () => void): this;
  on(event: "message",listener: (value: any) => void): this;
  on(event: string | symbol,listener: (...args: any[]) => void): this;

  once(event: "close",listener: () => void): this;
  once(event: "message",listener: (value: any) => void): this;
  once(event: string | symbol,listener: (...args: any[]) => void): this;

  prependListener(event: "close",listener: () => void): this;
  prependListener(event: "message",listener: (value: any) => void): this;
  prependListener(event: string | symbol,listener: (...args: any[]) => void): this;

  prependOnceListener(event: "close",listener: () => void): this;
  prependOnceListener(event: "message",listener: (value: any) => void): this;
  prependOnceListener(event: string | symbol,listener: (...args: any[]) => void): this;

  removeListener(event: "close",listener: () => void): this;
  removeListener(event: "message",listener: (value: any) => void): this;
  removeListener(event: string | symbol,listener: (...args: any[]) => void): this;

  off(event: "close",listener: () => void): this;
  off(event: "message",listener: (value: any) => void): this;
  off(event: string | symbol,listener: (...args: any[]) => void): this;
 }

MessagePort繼承自EventEmitter,它表示的是非同步雙向通訊channel的一端。這個channel就叫做MessageChannel,MessagePort通過MessageChannel來進行通訊。

我們可以通過MessagePort來傳輸結構體資料,記憶體區域或者其他的MessagePorts。

從原始碼中,我們可以看到MessagePort中有兩個事件,close和message。

close事件將會在channel的中任何一端斷開連線的時候觸發,而message事件將會在port.postMessage時候觸發,下面我們看一個例子:

const { MessageChannel } = require('worker_threads');
const { port1,port2 } = new MessageChannel();

// Prints:
// foobar
// closed!
port2.on('message',(message) => console.log(message));
port2.on('close',() => console.log('closed!'));

port1.postMessage('foobar');
port1.close();

port.on(‘message')實際上為message事件添加了一個listener,port還提供了addListener方法來手動新增listener。

port.on(‘message')會自動觸發port.start()方法,表示啟動一個port。

當port有listener存在的時候,這表示port存在一個ref,當存在ref的時候,程式是不會結束的。我們可以通過呼叫port.unref方法來取消這個ref。

接下來我們看一下怎麼通過port來傳輸訊息:

port.postMessage(value[,transferList])

postMessage可以接受兩個引數,第一個引數是value,這是一個JavaScript物件。第二個引數是transferList。

先看一個傳遞一個引數的情況:

const { MessageChannel } = require('worker_threads');
const { port1,port2 } = new MessageChannel();

port1.on('message',(message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);

通常來說postMessage傳送的物件都是value的拷貝,但是如果你指定了transferList,那麼在transferList中的物件將會被transfer到channel的接受端,並且不再存在於傳送端,就好像把物件傳送出去一樣。

transferList是一個list,list中的物件可以是ArrayBuffer,MessagePort 和 FileHandle。

如果value中包含SharedArrayBuffer物件,那麼該物件不能被包含在transferList中。

看一個包含兩個引數的例子:

const { MessageChannel } = require('worker_threads');
const { port1,(message) => console.log(message));

const uint8Array = new Uint8Array([ 1,2,3,4 ]);
// post uint8Array的拷貝:
port2.postMessage(uint8Array);

port2.postMessage(uint8Array,[ uint8Array.buffer ]);

//port2.postMessage(uint8Array);

上面的例子將輸出:

Uint8Array(4) [ 1,4 ]
Uint8Array(4) [ 1,4 ]

第一個postMessage是拷貝,第二個postMessage是transfer Uint8Array底層的buffer。

如果我們再次呼叫port2.postMessage(uint8Array),我們會得到下面的錯誤:

DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.

buffer是TypedArray的底層儲存結構,如果buffer被transfer,那麼之前的TypedArray將會變得不可用。

markAsUntransferable

要想避免這個問題,我們可以呼叫markAsUntransferable將buffer標記為不可transferable. 我們看一個markAsUntransferable的例子:

const { MessageChannel,markAsUntransferable } = require('worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
port1.postMessage(typedArray1,[ typedArray1.buffer ]);

console.log(typedArray1);
console.log(typedArray2);

SHARE_ENV

SHARE_ENV是傳遞給worker建構函式的一個env變數,通過設定這個變數,我們可以在主執行緒與工作執行緒進行共享環境變數的讀寫。

const { Worker,SHARE_ENV } = require('worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"',{ eval: true,env: SHARE_ENV })
 .on('exit',() => {
 console.log(process.env.SET_IN_WORKER); // Prints 'foo'.
 });

workerData

除了postMessage(),還可以通過在主執行緒中傳遞workerData給worker的建構函式,從而將主執行緒中的資料傳遞給worker:

const { Worker,isMainThread,workerData } = require('worker_threads');

if (isMainThread) {
 const worker = new Worker(__filename,{ workerData: 'Hello,world!' });
} else {
 console.log(workerData); // Prints 'Hello,world!'.
}

worker類

先看一下worker的定義:

 class Worker extends EventEmitter {
  readonly stdin: Writable | null;
  readonly stdout: Readable;
  readonly stderr: Readable;
  readonly threadId: number;
  readonly resourceLimits?: ResourceLimits;

  constructor(filename: string | URL,options?: WorkerOptions);

  postMessage(value: any,transferList?: Array<ArrayBuffer | MessagePort>): void;
  ref(): void;
  unref(): void;

  terminate(): Promise<number>;

  getHeapSnapshot(): Promise<Readable>;

  addListener(event: "error",listener: (err: Error) => void): this;
  addListener(event: "exit",listener: (exitCode: number) => void): this;
  addListener(event: "message",listener: (value: any) => void): this;
  addListener(event: "online",listener: () => void): this;
  addListener(event: string | symbol,listener: (...args: any[]) => void): this;

  ... 
 }

worker繼承自EventEmitter,並且包含了4個重要的事件:error,exit,message和online。

worker表示的是一個獨立的 JavaScript 執行執行緒,我們可以通過傳遞filename或者URL來構造worker。

每一個worker都有一對內建的MessagePort,在worker建立的時候就會相互關聯。worker使用這對內建的MessagePort來和父執行緒進行通訊。

通過parentPort.postMessage()傳送的訊息在主執行緒中將可以通過worker.on(‘message')接收。

主執行緒中通過worker.postMessage()傳送的訊息將可以在工作執行緒中通過parentPort.on(‘message')接收。

當然,你也可以顯式的建立MessageChannel 物件,然後將MessagePort作為訊息傳遞給其他執行緒,我們看一個例子:

const assert = require('assert');
const {
 Worker,MessageChannel,MessagePort,parentPort
} = require('worker_threads');
if (isMainThread) {
 const worker = new Worker(__filename);
 const subChannel = new MessageChannel();
 worker.postMessage({ hereIsYourPort: subChannel.port1 },[subChannel.port1]);
 subChannel.port2.on('message',(value) => {
 console.log('接收到:',value);
 });
} else {
 parentPort.once('message',(value) => {
 assert(value.hereIsYourPort instanceof MessagePort);
 value.hereIsYourPort.postMessage('工作執行緒正在傳送此訊息');
 value.hereIsYourPort.close();
 });
}

上面的例子中,我們藉助了worker和parentPort本身的訊息傳遞功能,傳遞了一個顯式的MessageChannel中的MessagePort。

然後又通過該MessagePort來進行訊息的分發。

receiveMessageOnPort

除了port的on(‘message')方法之外,我們還可以使用receiveMessageOnPort來手動接收訊息:

const { MessageChannel,receiveMessageOnPort } = require('worker_threads');
const { port1,port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined

moveMessagePortToContext

先了解一下nodejs中的Context的概念,我們可以從vm中建立context,它是一個隔離的上下文環境,從而保證不同執行環境的安全性,我們看一個context的例子:

const vm = require('vm');

const x = 1;

const context = { x: 2 };
vm.createContext(context); // 上下文隔離化物件。

const code = 'x += 40; var y = 17;';
// `x` and `y` 是上下文中的全域性變數。
// 最初,x 的值為 2,因為這是 context.x 的值。
vm.runInContext(code,context);

console.log(context.x); // 42
console.log(context.y); // 17

console.log(x); // 1; y 沒有定義。

在worker中,我們可以將一個MessagePort move到其他的context中。

worker.moveMessagePortToContext(port,contextifiedSandbox)

這個方法接收兩個引數,第一個引數就是要move的MessagePort,第二個引數就是vm.createContext()建立的context物件。

worker_threads的執行緒池

上面我們提到了使用單個的worker thread,但是現在程式中一個執行緒往往是不夠的,我們需要建立一個執行緒池來維護worker thread物件。

nodejs提供了AsyncResource類,來作為對非同步資源的擴充套件。

AsyncResource類是async_hooks模組中的。

下面我們看下怎麼使用AsyncResource類來建立worker的執行緒池。

假設我們有一個task,使用來執行兩個數相加,指令碼名字叫做task_processor.js:

const { parentPort } = require('worker_threads');
parentPort.on('message',(task) => {
 parentPort.postMessage(task.a + task.b);
});

下面是worker pool的實現:

const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
 constructor(callback) {
 super('WorkerPoolTaskInfo');
 this.callback = callback;
 }

 done(err,result) {
 this.runInAsyncScope(this.callback,null,err,result);
 this.emitDestroy(); // `TaskInfo`s are used only once.
 }
}

class WorkerPool extends EventEmitter {
 constructor(numThreads) {
 super();
 this.numThreads = numThreads;
 this.workers = [];
 this.freeWorkers = [];

 for (let i = 0; i < numThreads; i++)
  this.addNewWorker();
 }

 addNewWorker() {
 const worker = new Worker(path.resolve(__dirname,'task_processor.js'));
 worker.on('message',(result) => {
  // In case of success: Call the callback that was passed to `runTask`,// remove the `TaskInfo` associated with the Worker,and mark it as free
  // again.
  worker[kTaskInfo].done(null,result);
  worker[kTaskInfo] = null;
  this.freeWorkers.push(worker);
  this.emit(kWorkerFreedEvent);
 });
 worker.on('error',(err) => {
  // In case of an uncaught exception: Call the callback that was passed to
  // `runTask` with the error.
  if (worker[kTaskInfo])
  worker[kTaskInfo].done(err,null);
  else
  this.emit('error',err);
  // Remove the worker from the list and start a new Worker to replace the
  // current one.
  this.workers.splice(this.workers.indexOf(worker),1);
  this.addNewWorker();
 });
 this.workers.push(worker);
 this.freeWorkers.push(worker);
 this.emit(kWorkerFreedEvent);
 }

 runTask(task,callback) {
 if (this.freeWorkers.length === 0) {
  // No free threads,wait until a worker thread becomes free.
  this.once(kWorkerFreedEvent,() => this.runTask(task,callback));
  return;
 }

 const worker = this.freeWorkers.pop();
 worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
 worker.postMessage(task);
 }

 close() {
 for (const worker of this.workers) worker.terminate();
 }
}

module.exports = WorkerPool;

我們給worker建立了一個新的kTaskInfo屬性,並且將非同步的callback封裝到WorkerPoolTaskInfo中,賦值給worker.kTaskInfo.

接下來我們就可以使用workerPool了:

const WorkerPool = require('./worker_pool.js');
const os = require('os');

const pool = new WorkerPool(os.cpus().length);

let finished = 0;
for (let i = 0; i < 10; i++) {
 pool.runTask({ a: 42,b: 100 },(err,result) => {
 console.log(i,result);
 if (++finished === 10)
  pool.close();
 });
}

到此這篇關於nodejs中使用worker_threads來建立新的執行緒的方法的文章就介紹到這了,更多相關nodejs使用worker_threads建立執行緒內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!