【JavaScript】吃飽了撐的系列之JavaScript模擬多執行緒併發
阿新 • • 發佈:2019-09-08
前言
最近,明學是一個火熱的話題,而我,卻也想當那麼一回明學家,那就是,把JavaScript和多執行緒併發這兩個八竿子打不找的東西,給硬湊了起來,還寫了一個併發庫concurrent-thread-js。尷尬的是,當我發現其中的不合理之處,即這個東東的應用場景究竟是什麼時,我發現我已經把程式碼寫完了。
⚠️注意! 本文中的執行緒指的都是用JS非同步函式模擬的“假執行緒”,不是真正意義上的多執行緒,請不要誤解⚠️
沒錯,一般來說JS中模擬多執行緒我們也許會選用webworker,但是它必須要求你手動建立額外的webworker指令碼檔案,並通過new work('work.js')這種方式使用,這並不能達到我專案中想要的API的效果,而且注意:webwork中的環境不是window!很多方法你調用不了的。你只能採取這種方案,也即在主執行緒完成該功能,這是我沒有選擇webworker的另一個原因。
說是這樣說,但其實在大多數時候還是用webworker就夠了
github地址
https://github.com/penghuwan/concurrent-thread.js
本文的目的
事實上,這個庫用處很小,但是在寫的過程中,我對Promise,Async函式以及event事件流的使用產生了新的認識,同時也逐漸去學習和了解怎麼去從零開始去寫一個非業務的,通用的npm模組,所以希望拿出來和大家分享一下,這才是本文的真正的目的。 好,我們從一個故事開始。 場景一 場景二github地址
https://github.com/penghuwan/concurrent-thread.jsgithub.com注意!倘若不考慮webworker這種解決方案,我們一般都認為JS是單執行緒的。
concurrent-thread-js功能簡介
為單執行緒的JavaScript實現併發協調的功能,語意,命名和作用性質上參考Java的實現,提sleep/join/interupt等API以及鎖和條件變數等內容,並提供執行緒間通訊的功能,依賴ES6語法,基於Promise和Async函式實現,故需要Babel編譯才能執行。JavaScrpt本來就是單執行緒的,所以這只是在API的層面實現了模擬,在下文的介紹中,每條所謂的執行緒其實就是普通的非同步函式,並在此基礎上實現不同執行緒的協調配合。為什麼不選用webworker實現?
什麼時候使用concurrent-thread-js
這個問題真是靈魂拷問,可是既然程式碼寫都寫了,我怎麼也得編一個理由出來啊!額。。。讓我想想哈 它的作用是:當JS工程需要讓兩個函式在執行上不互相干擾,同時也不希望它們會阻塞主執行緒,與此同時,還希望這兩個函式實現類似併發多執行緒之間的協調的需求的時候,你可以使用這個併發模擬庫,實際上這種應用場景。。。這尼瑪有這種應用場景嗎?!(扎心了呀)。API總覽
- submit(function,[namespace]): 接收一個函式,普通函式或Async函式均可,並非同步執行"執行緒"
- sleep(ms): "執行緒"休眠,可指定休眠時間ms,以毫秒計算
- join(threadName): "執行緒"同步,呼叫此方法的"執行緒"函式將在threadName執行結束後繼續執行
- interupt(threadName): "執行緒"中斷,影響"執行緒"內部調this.isInterrupted()的返回值
- Lock.lock: 加鎖,一個時刻只能有一個"執行緒"函式進入臨界區,其他"執行緒"函式需要等待,鎖是非公平的,也就是說後面排隊的執行緒函式沒有先後,以隨機的方式進行競爭。
- Lock.unlock:解除非公平鎖
- Condition.wait:不具備執行條件,"執行緒"進入waiting狀態,等待被喚醒
- Condition.notify:隨機喚醒一個wait的"執行緒"
- Condition.notifyAll: 尚未編寫,喚醒所有wait的"執行緒"
- getState: 還沒寫完 獲取"執行緒"狀態,包括RUNNALE(執行),WAITING(等待),BLOCKED(阻塞),TERMINATED(終止)
- ThreadPool類:包含submit/sleep/join/interrupt/getState方法
- Lock類:包含Lock.lock和Lock.unLock方法
- Condition類:包含Condition.wait和Condition.notify方法
A1.submit方法
submit模擬提交執行緒至執行緒池// 備註:為循序漸進介紹,以下為簡化程式碼 // 儲存每個執行緒函式的狀態,例如是否中斷,以及執行緒狀態等 const threadMap = {}; class ThreadPool { // 模擬執行緒中斷 interrupt(threadName) { } // 模擬執行緒同步 join(threadName, targetThread) { } // 模擬執行緒休眠 sleep(ms) { } }; function submit(func, name) { if (!func instanceof Function) return; // 方式1:傳入一個具名函式;方式2:傳入第二個引數,即執行緒名稱空間 const threadName = func.name || name; // threadMap負責儲存執行緒狀態資料 threadMap[threadName] = { state: RUNNABLE, isInterrupted: false }; // 讓func非同步呼叫,同時將傳入函式的作用域繫結為 ThreadPool原型 Promise.resolve({ then: func.bind(ThreadPool.prototype); }) }
首先,我們做了三件事情:
- 獲取執行緒函式的名稱空間,並初始化執行緒初始資料,不同執行緒狀態由threadMap全域性儲存
- 將提交的函式func作為Promise.resolve方法中的一個thenable物件的then引數,這相當於立即"完成"一個Promise,同時在then方法中執行func,func會以非同步而不是同步的方式進行執行,你也可以簡單的理解成類似於執行了setTimeOut(func,0);
- 將func的作用域繫結為新生成的ThreadPool例項,ThreadPool中定義了我們上面我們介紹到的方法,如sleep/join/interupt等,這有什麼好處呢?這意味著我們可以直接在函式中通過呼叫this.interrupt的方式去呼叫我們定義的API了,符合我們的使用習慣(注意,class中定義的除箭頭函式外的普通函式實際上都存放在原型中)
submit(async function example() { this.interrupt(); });
但問題在於:現在因為所有的函式通過this呼叫的都是ThreadPool原型中的方法,我們要在呼叫唯一的interrupt方法,需要在非同步函式中傳入"執行緒"標識,如執行緒名。這顯然不方便,也不優雅,例如下面的命名為example的執行緒函式
submit(async function example() { this.interrupt('example'); });
使用這個模組使用者會感到奇怪:我明明在example函式中,為什麼還要給呼叫方法傳example這個名字引數??難道不能在模組內部把這事情幹了嗎? 對!我們下面做的就是這件事情,我們編寫一個delegateThreadPool方法,由它為ThreadPool代理處理不同“執行緒“函式的函式名
// 返回代理後的ThreadPool function delegateThreadPool(threadName) { // threadName為待定的執行緒名,在submit方法呼叫時候傳入 // 代理後的ThreadPool const proxyClass = {}; // 獲取ThreadPool原來的所有的方法,賦給props陣列 var props = Object.getOwnPropertyNames(ThreadPool.prototype); for (let prop of props) { // 代理ThreadPool,為其所有方法增加threadName這個引數 let fnName = prop; proxyClass[fnName] = (...args) => { const fn = baseClass[fnName]; return fn(threadName, ...args); }; } return proxyClass; } function submit(func, name) { // 省略其他程式碼 。。。 const proxyScope = delegateThreadPool(threadName); // 讓func非同步呼叫,不阻塞主執行緒,同時實現併發 Promise.resolve({ then: function () { // 給func繫結this為代理後的ThreadPool物件,以便呼叫方法 func.call(proxyScope); } }); } // 呼叫this.sleep方法時,已經無需增加函式命名作為引數了 submit(async function example() { this.interrupt(); });
也就是說,我們的執行緒函式func繫結的已經不是ThreadPool.prototype了,而是delegateThreadPool處理後返回的物件:proxyScope。這時候,我們在“執行緒”函式體裡呼叫this.interrupt方法時,已經無需增加函式命名作為引數了,因為這個工作,proxyScope物件幫我們做了,其實它的工作很簡單——就是它的每個函式,都在一個返回的閉包裡面呼叫ThreadPool的同名函式,並傳遞執行緒名作為第一個引數。
A2. sleep方法
作用:執行緒休眠 sleep方法很簡單,無非就是返回一個Promise例項,在Promise的函式裡面調setTimeOut,等時間到了執行resolve函式,這段時間裡修飾Promise的await語句會阻塞一段時間,resolve後又await語句又繼續向下執行了,能滿足我們想要的休眠效果// 模擬“執行緒”休眠 sleep(ms) { return new Promise(function (resolve) { setTimeout(resolve, ms); }) } // 提交“執行緒” submit(async function example() { // 阻塞停留3秒,然後才輸出1 await this.sleep(3000); console.log(1); });
A3. interrupt方法
作用:執行緒中斷,可用於處理執行緒停止等操作 這裡要先介紹一下Java裡面的interrupt方法:在JAVA裡,你不能通過呼叫terminate方法停掉一個執行緒,因為這有可能會因為處理邏輯突然中斷而導致資料不一致的問題,所以要通過interrupt方法把一箇中斷標誌位置為true,然後通過isInterrupted方法作為判斷條件跳出關鍵程式碼。 所以為了模擬,我在JS中處理“執行緒”中斷也是這麼去做的,但是我們這樣做的根本原因是:我們壓根沒有可以停掉一個執行緒函式的方法!(JAVA是有但是不準用,即廢棄了而已)// 模擬執行緒中斷 interrupt(threadName) { if (!threadName) { throw new Error('Miss function parameters') } if (threadMap[threadName]) { threadMap[threadName].isInterrupted = true; } } // 獲取執行緒中斷狀態 isInterrupted(threadName) { if (!threadName) { throw new Error('Miss function parameters') } // !!的作用是:將undefined轉為false return !!threadMap[threadName].isInterrupted; }
A4. join方法 join(threadName): "執行緒"同步,呼叫此方法的"執行緒"函式將在threadName執行結束後繼續執行 join方法和上面的sleep方法是一樣的道理,我們讓它返回一個Promise,只要我們不調resolve,那麼外部修飾Promise的await語句就會一直暫停,等到join的那個另一個執行緒執行完了,我們看準時機!把這個Promise給resolve,這時候外部修飾Promise的await語句不就又可以向下執行了嗎? 但問題在於:我們如何實現這個“一個函式執行完通知另一個函式的功能呢”?沒錯!那就是我們JavaScript最喜歡的套路: 事件流! 我們下面使用event-emitter這個前後端通用的模組實現事件流。 我們只要在任何一個函式結束的時候觸發結束事件(join-finished),同時傳遞該執行緒的函式名作為引數,然後在join方法內部監聽該事件,並在響應時候呼叫resolve方法不就可以了嘛。 首先是在join方法內部監聽執行緒函式的結束事件
import ee from 'event-emitter'; const emitter = ee(); // 模擬執行緒同步 join(threadName, targetThread) { return new Promise((resolve) => { // 監聽其他執行緒函式的結束事件 emitter.on('join-finished', (finishThread) => { // 根據結束執行緒的執行緒名finishThread做判斷 if (finishThread === targetThread) { resolve(); } }) }) }
同時線上程函式執行結束時觸發join-finished事件,傳遞執行緒名做引數
import ee from 'event-emitter'; const emitter = ee(); function submit(func, name) { // ... Promise.resolve({ then: func().then(() => { emitter.emit('join-finished', threadName); }) }); }使用如下:
submit(async function thread1 () { this.join('thread2'); console.log(1); }); submit(async function thread2 () { this.sleep(3000); console.log(2) }) // 3s後,依次輸出 2 1
A5. Lock.lock & Lock.unlock(非公平鎖)
我們主要是要編寫兩個方法:lock和unlock方法。我們需要設定一個Boolean屬性isLock- lock方法:lock方法首先會判斷isLock是否為false,如果是,則代表沒有執行緒佔領臨界區,那麼允許該執行緒進入臨界區,同時把isLock設定為true,不允許其他執行緒函式進入。其他執行緒進入時,由於判斷isLock為true,會setTimeOut每隔一段時間遞迴呼叫判斷isLock是否為false,從而以較低效能消耗的方式模擬while死迴圈。當它們檢測到isLock為false時候,則會進入臨界區,同時設定isLock為true。因為後面的執行緒沒有先後順序,所以這是一個非公平鎖
- unLock方法:unlock則是把isLock屬性設定為false,解除鎖定就可以了
// 這是一個非公平鎖 class Lock { constructor() { this.isLock = false; } //加鎖 lock() { if (this.isLock) { const self = this; // 迴圈while死迴圈,不停測試isLock是否等於false return new Promise((resolve) => { (function recursion() { if (!self.isLock) { // 佔用鎖 self.isLock = true; // 使外部await語句繼續往下執行 resolve(); return; } setTimeout(recursion, 100); })(); }); } else { this.isLock = true; return Promise.resolve(); } } // 解鎖 unLock() { this.isLock = false; } } const lockObj = new Lock(); export default lockObj;
執行示例如下:
async function commonCode() { await Lock.lock(); await Executor.sleep(3000); Lock.unLock(); } submit(async function example1() { console.log('example1 start') await commonCode(); console.log('example1 end') }); submit(async function example2() { console.log('example2 start') await commonCode(); console.log('example2 end') });輸出
// 立即輸出 example1 start example2 start // 3秒後輸出 example1 end // 再3秒後輸出 example2 end
A6. Condition.wait & Condition.notify(條件變數)
- Condition.wait:不具備執行條件,執行緒進入waiting狀態,等待被喚醒
- Condition.notify: 喚醒執行緒
import ee from 'event-emitter'; const ev = ee(); class Condition { constructor() { this.n = 0; this.list = []; } // 當不滿足條件時,讓執行緒處於等待狀態 wait() { return new Promise((resolve) => { const eventName = `notify-${this.n}`; this.n++; const list = this.list; list.push(eventName); ev.on(eventName, () => { // 從列表中刪除事件名 const i = list.indexOf(eventName); list.splice(i, 1); // 讓外部函式恢復執行 debugger; resolve(); }) }) } // 選擇一個執行緒喚醒 notify() { const list = this.list; let i = Math.random() * (this.list.length - 1); i = Math.floor(i); ev.emit(list[i]) } }
測試程式碼
async function testCode() { console.log('i will be wait'); if (true) { await Condition.wait(); }; console.log('i was notified '); } submit(async function example() { testCode(); setTimeout(() => { Condition.notify(); }, 3000); });輸出
i will be wait // 3秒後輸出 i was notified
最後的大總結
其實說到底,我想和大家分享的不是什麼併發啊,什麼多執行緒啦。 其實我想表達的是:事件監聽 + Promise + Async函式這套組合拳很好用啊- 你想讓一段程式碼停一下?OK!寫個返回Promise的函式,用await修飾,它就停啦!
- 你想控制它(await)不要停了,繼續往下走?OK! 把Promise給resolve掉,它就往下走啦
- 你說你不知道怎麼控制它停,因為監聽和發射事件的程式碼分佈在兩個地方?OK!那就使用事件流