JavaScript 中如何實現併發控制
一、併發控制簡介
假設有 6 個待辦任務要執行,而我們希望限制同時執行的任務個數,即最多隻有 2 個任務能同時執行。當 正在執行任務列表 中的任何 1 個任務完成後,程式會自動從 待辦任務列表 中獲取新的待辦任務並把該任務新增到 正在執行任務列表 中。為了讓大家能夠更直觀地理解上述的過程,阿寶哥特意畫了以下 3 張圖:
1.1 階段一
1.2 階段二
1.3 階段三
好的,介紹完併發控制之後,阿寶哥將以 github 上 async-pool 這個庫來介紹一下非同步任務併發控制的具體實現。
https://github.com/rxaviers/async-pool
Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。
二、併發控制的實現
async-pool 這個庫提供了 ES7 和 ES6 兩種不同版本的實現,在分析其具體實現之前,我們來看一下它如何使用。
2.1 asyncPool 的使用
consttimeout=i=>newPromise(resolve=>setTimeout(()=>resolve(i),i)); awaitasyncPool(2,[1000,5000,3000,2000],timeout);
在以上程式碼中,我們使用 async-pool 這個庫提供的 asyncPool 函式來實現非同步任務的併發控制。asyncPool 函式的簽名如下所示:
functionasyncPool(poolLimit,array,iteratorFn){...}
該函式接收 3 個引數:
- poolLimit(數字型別):表示限制的併發數;
- array(陣列型別):表示任務陣列;
- iteratorFn(函式型別):表示迭代函式,用於實現對每個任務項進行處理,該函式會返回一個 Promise 物件或非同步函式。
對於以上示例來說,在使用了 asyncPool 函式之後,對應的執行過程如下所示:
consttimeout=i=>newPromise(resolve=>setTimeout(()=>resolve(i),timeout); //Calliterator(i=1000) //Calliterator(i=5000) //Poollimitof2reached,waitforthequickeronetocomplete... //1000finishes //Calliterator(i=3000) //Poollimitof2reached,waitforthequickeronetocomplete... //3000finishes //Calliterator(i=2000) //Itarationiscomplete,waituntilrunningonescomplete... //5000finishes //2000finishes //Resolves,resultsarepassedingivenarrayorder`[1000,2000]`.
通過觀察以上的註釋資訊,我們可以大致地瞭解 asyncPool 函式內部的控制流程。下面我們先來分析 asyncPool 函式的 ES7 實現。
2.2 asyncPool ES7 實現
asyn程式設計客棧cfunctionasyncPool(poolLimit,iteratorFn){ constret=[];//儲存所有的非同步任務 constexecuting=[];//儲存正在執行的非同步任務 for(constitemofarray){ //呼叫iteratorFn函式建立非同步任務 constp=Promise.resolve().then(()=>iteratorFn(item,array)); ret.push(p);//儲存新的非同步任務 //當poolLimit值小於或等於總任務個數時,進行併發控制 if(poolLimit<=array.length){ //當任務完成後,從正在執行的任務陣列中移除已完成的任務 conste=p.then(()=>executing.splice(executing.indexOf(e),1)); executing.push(e);//儲存正在執行的非同步任務 if(executing.length>=poolLimit){ awaitPromise.race(executinwww.cppcns.comg);//等待較快的任務執行完成 } } } returnPromise.all(ret); }
在以上程式碼中,充分利用了 Promise.all 和 Promise.race 函式特點,再結合 ES7 中提供的 async await 特性,最終實現了併發控制的功能。利用 await Promise.race(executing); 這行語句,我們會等待 正在執行任務列表 中較快的任務執行完成之後,才會繼續執行下一次迴圈。
asyncPool ES7 實現相對比較簡單,接下來我們來看一下不使用 async await 特性要如何實現同樣的功能。
2.3 asyncPool ES6 實現
functionasyncPool(poolLimit,iteratorFn){ leti=0; constret=[];//儲存所有http://www.cppcns.com的非同步任務 constexecuting=[];//儲存正在執行的非同步任務 constenqueue=function(){ if(i===array.length){ returnPromise.resolve(); } constitem=array[i++];//獲取新的任務項 constp=Promise.resolve().then(()=>iteratorFn(item,array)); ret.push(p); letr=Promise.resolve()程式設計客棧; //當poolLimit值小於或等於總任務個數時,進行併發控制 if(poolLimit<=array.length){ //當任務完成後,從正在執行的任務陣列中移除已完成的任務 conste=p.then(()=>executing.splice(executing.indexOf(e),1)); executing.push(e); if(executing.length>=poolLimit){ r=Promise.race(executing); } } //正在執行任務列表中較快的任務執行完成之後,才會從array陣列中獲取新的待辦任務 returnr.then(()=>enqueue()); }; returnenqueue().then(()=>Promise.all(ret)); }
在 ES6 的實現版本中,通過內部封裝的 enqueue 函式來實現核心的控制邏輯。當 Promise.race(executing) 返回的 Promise 物件變成已完成狀態時,才會呼叫 enqueue 函式,從 array 陣列中獲取新的待辦任務。
三、阿寶哥有話說
在 asyncPool 這個庫的 ES7 和 ES6 的具體實現中,我們都使用到了 Promise.all 和 Promise.race 函式。其中手寫 Promise.all 是一道常見的面試題。剛好趁著這個機會,阿寶哥跟大家一起來手寫簡易版的 Promise.all 和 Promise.race 函式。
3.1 手寫 Promise.all
Promise.all(iterable) 方法會返回一個 promise 物件,當輸入的所有 promise 物件的狀態都變成 resolved 時,返回的 promise 物件就會以陣列的形式,返回每個 promise 物件 resolve 後的結果。當輸入的任何一個 promise 物件狀態變成 rejected 時,則返回的 promise 物件會 reject 對應的錯誤資訊。
Promise.all=function(iterators){ returnnewPromise((resolve,reject)=>{ if(!iterators||iterators.length===0){ resolve([]); }else{ letcount=0;//計數器,用於判斷所有任務是否執行完成 letresult=[];//結果陣列 &nhttp://www.cppcns.combsp;for(leti=0;i<iterators.length;i++){ //考慮到iterators[i]可能是普通物件,則統一包裝為Promise物件 Promise.resolve(iterators[i]).then( (data)=>{ result[i]=data;//按順序儲存對應的結果 //當所有任務都執行完成後,再統一返回結果 if(++count===iterators.length){ resolve(result); } },(err)=>{ reject(err);//任何一個Promise物件執行失敗,則呼叫reject()方法 return; } ); } } }); };
需要注意的是對於 Promise.all 的標準實現來說,它的引數是一個可迭代物件,比如 Array、String 或 Set 等。
3.2 手寫 Promise.race
Promise.race(iterable) 方法會返回一個 promise 物件,一旦迭代器中的某個 promise 物件 resolved 或 rejected,返回的 promise 物件就會 resolve 或 reject 相應的值。
Promise.race=function(iterators){ returnnewPromise((resolve,reject)=>{ for(constiterofiterators){ Promise.resolve(iter) .then((res)=>{ resolve(res); }) .catch((e)=>{ reject(e); }); } }); };
本文阿寶哥帶大家詳細分析了 async-pool 非同步任務併發控制的具體實現,同時為了讓大家能夠更好地理解 async-pool 的核心程式碼。最後阿寶哥還帶大家一起手寫簡易版的 Promise.all 和 Promise.race 函式。其實除了 Promise.all 函式之外,還存在另一個函式 —— Promise.allSettled,該函式用於解決 Promise.all 存在的問題,感興趣的小夥伴可以自行研究一下。
四、參考資源
Github - async-pool
MDN - Promise.all
MDN - Promise.race
MDN - Promise.allSettled
以上就是javascript 中如何實現併發控制的詳細內容,更多關於javaScript實現併發控制的資料請關注我們其它相關文章!