nodejs中的併發程式設計
阿新 • • 發佈:2020-03-20
## 從sleep的實現說起
在nodejs中,如果要實現sleep的功能主要是通過“setTimeout + promise”實現,也可以通過“迴圈空轉”來解決。前者是利用定時器實現任務的延遲執行,並通過promise鏈管理任務間的時序與依賴,本質上nodejs的執行執行緒並沒有真正的sleep,事件迴圈以及v8仍在執行,是僅僅表現在業務邏輯上sleep;而後者的實現則無疑實在浪費CPU效能,有點類似自旋鎖,不符合大多數場景。
若要實現引擎層面(執行時)的sleep,事情在[ECMAScript Latest Draft (ECMA-262)](https://tc39.es/ecma262/#sec-atomics.wait)出現之後開始有了轉機。ECMA262規定了 **Atomics.wait**,它會將呼叫該方法的代理(引擎)陷入等待佇列並讓其sleep,直到被notify或者超時。**該規範在8.10.0以上版本的nodejs上被實現。**
事實上,Atomics.wait 的出現主要解決瀏覽器或nodejs的worker之間資料同步的問題。瀏覽器上的web-worker、正式被nodejs@12納入的worker-threads模組,這些都是ECMAScript多執行緒模型的具體實現。既然出現多執行緒那麼執行緒間的同步也就不可避免的被提到,在前端以及nodejs範圍內可以使用Atomics.wait和notify來解決。
說的有些跑題,回到本節,如何實現執行時的sleep呢?很簡單,利用Atomics.wait的等待超時機制:
```
let sharedBuf = new SharedArrayBuffer(4);
let sharedArr = new Int32Array(sharedBuf);
// 睡眠n秒
let sleep = function(n){
Atomics.wait(sharedArr, 0, 0, n * 1000);
}
```
此處的sleep並不是非同步方法,它會阻塞執行執行緒直到超時,因此需要根據業務場景來使用該sleep模型。
關於Atomics.wait的具體使用方法,下文會著重講解。
## 多執行緒同步
雖然nodejs多執行緒使用場景不是很多,但是一旦涉及到多執行緒,那麼執行緒間同步就必不可少,否則無法解決臨界區的問題。不過nodejs的work_threads對執行緒的建立不同於c或者java,它使用libuv的API建立執行緒 “uv_thread_create”,但是在此之前需要初始化一些設施如MessagePort、v8例項設定等,因此建立一個thread並不是一個輕量級的操作,需要結合場景酌情建立適量的threads。
回到正題,多執行緒間的同步一般需要依賴鎖,而鎖的實現需要依賴於全域性變數。在nodejs的work_threads實現中,主執行緒無法設定全域性變數,因此可以通過Atomics實現。正如上例中所示,Atomics.wait依賴 **SharedArrayBuffer**,這是共享記憶體的ArrayBuffer,threads之間可通過它共享資料,可真正操作ArrayBuffer時並不直接使用該物件,而是TypeArray。如Atomics.wait,第一個引數必須是**Int32Array**物件,而該物件指向的緩衝區為SharedArrayBuffer。當執行緒A因為Atomics.wait而阻塞後,可通過其它執行緒B呼叫**Atomics.notify**進行喚醒從而讓執行緒A的v8繼續執行。
```
let { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
var sab = new SharedArrayBuffer(1024);
var int32 = new Int32Array(sab);
if (isMainThread) {
const worker = new Worker(__filename, {
workerData: sab
});
worker.on('message', (d) => {
console.log('parent receive message:', d);
});
worker.on('error', (e) => {
console.error('parent receive error', e);
});
worker.on('exit', (code) => {
if (code !== 0)
console.error(new Error(`工作執行緒使用退出碼 ${code} 停止`));
});
Atomics.wait(int32, 0, 0); // A
console.log(int32[0]); // C: 123
} else {
let buf = workerData;
let arrs = new Int32Array(buf);
Atomics.store(arrs, 0, 123);
Atomics.notify(arrs, 0); // B
}
```
上例中,主執行緒建立thread後,在A處進行阻塞;在新執行緒中,通過原子操作**Atomics.store**修改SharedArrayBuffer的第一項為123後,於B處喚醒阻塞在SharedArrayBuffer第一項的其它執行緒;此時主執行緒被喚醒,執行`console.log(int32[0])`,輸出被新執行緒修改後的SharedArrayBuffer第一項資料123。
### 鎖
分析一個公平、排它、不可重入鎖的實現,它使用**Atomics.wait/notify/compareExchange**完成執行緒的同步。
```
main-thread.js
let Lock = require('./lock').Lock;
let { Worker } = require('worker_threads');
const sharedBuffer = new SharedArrayBuffer(1 * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sharedBuffer);
let worker = new Worker('./worker-lock.js', {
workerData: sharedBuffer
});
Lock.initialize(sharedArray, 0);
const lock = new Lock(sharedArray, 0);
// 獲取鎖
lock.lock();
// 3s後釋放鎖
setTimeout(() => {
lock.unlock(); // (B)
}, 3000)
```
```
worker-thread.js
let Lock = require('./lock').Lock;
let { parentPort, workerData } = require('worker_threads');
const sharedArray = new Int32Array(workerData);
const lock = new Lock(sharedArray, 0);
console.log('Waiting for lock...'); // (A)
// 獲取鎖
lock.lock(); // (B) blocks!
console.log('Unlocked'); // (C)
```
主執行緒初始化互斥鎖,同時建立執行緒,主執行緒獲取鎖後三秒鐘釋放;
worker執行緒嘗試獲取鎖,此時鎖已被主執行緒獲取,因此worker執行緒在此阻塞,等待3s後主執行緒釋放鎖被喚醒,繼續執行輸出。
```
lock.js
const UNLOCKED = 0;
const LOCKED_NO_WAITERS = 1;
const LOCKED_POSSIBLE_WAITERS = 2;
const NUMINTS = 1;
class Lock {
// 'iab' must be a Int32Array mapping shared memory.
// 'ibase' must be a valid index in iab, the first of NUMINTS reserved for the lock.
constructor(iab, ibase) {
if (!(iab instanceof Int32Array && ibase|0 === ibase && ibase >= 0 && ibase+NUMINTS <= iab.length)) {
throw new Error(`Bad arguments to Lock constructor: ${iab} ${ibase}`);
}
this.iab = iab;
this.ibase = ibase;
}
static initialize(iab, ibase) {
if (!(iab instanceof Int32Array && ibase|0 === ibase && ibase >= 0 && ibase+NUMINTS <= iab.length)) {
throw new Error(`Bad arguments to Lock constructor: ${iab} ${ibase}`);
}
Atomics.store(iab, ibase, UNLOCKED);
return ibase;
}
// Acquire the lock, or block until we can. Locking is not recursive:
lock() {
const iab = this.iab;
const stateIdx = this.ibase;
var c;
if ((c = Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS)) !== UNLOCKED) { // A
do {
if (c === LOCKED_POSSIBLE_WAITERS
|| Atomics.compareExchange(iab, stateIdx, LOCKED_NO_WAITERS, LOCKED_POSSIBLE_WAITERS) !== UNLOCKED) {
Atomics.wait(iab, stateIdx, LOCKED_POSSIBLE_WAITERS, Number.POSITIVE_INFINITY);
}
} while ((c = Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_POSSIBLE_WAITERS)) !== UNLOCKED); // B
}
}
tryLock() {
const iab = this.iab;
const stateIdx = this.ibase;
return Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS) === UNLOCKED;
}
unlock() {
const iab = this.iab;
const stateIdx = this.ibase;
var v0 = Atomics.sub(iab, stateIdx, 1);
// Wake up a waiter if there are any
if (v0 !== LOCKED_NO_WAITERS) {
Atomics.store(iab, stateIdx, UNLOCKED);
Atomics.notify(iab, stateIdx, 1);
}
}
toString() {
return "Lock:{ibase:" + this.ibase +"}";
}
}
exports.Lock = Lock;
```
當程序A嘗試獲取鎖成功時,A處判斷語句為false,因此由compareExchange設定狀態為LOCKED_NO_WAITERS,直接執行其後續邏輯;
若程序B此時執行lock獲取鎖時,A處判斷為true,進入do while迴圈體,在wait處sleep;
程序A通過unlock釋放鎖,會將鎖狀態置為UNLOCKED,同時喚醒阻塞的程序B;
程序B執行迴圈判斷語句B,此時為false,跳出迴圈執行B的邏輯。
當然,也可通過tryLock實現自旋鎖或者其他邏輯實現非阻塞等待。
## 參考
[libuv漫談之執行緒](https://zhuanlan.zhihu.com/p/25973650)
[Atomics](https://tc39.es/ecma262/#sec-atomics.wait)
[Atomics MDN](https://developer.mozilla.org/zh-TW/docs/Web/JavaScript/Reference/Global_Objects/