1. 程式人生 > 程式設計 >JavaScript 中如何實現併發控制

JavaScript 中如何實現併發控制

一、併發控制簡介

假設有 6 個待辦任務要執行,而我們希望限制同時執行的任務個數,即最多隻有 2 個任務能同時執行。當 正在執行任務列表 中的任何 1 個任務完成後,程式會自動從 待辦任務列表 中獲取新的待辦任務並把該任務新增到 正在執行任務列表 中。為了讓大家能夠更直觀地理解上述的過程,阿寶哥特意畫了以下 3 張圖:

1.1 階段一

JavaScript 中如何實現併發控制

1.2 階段二

JavaScript 中如何實現併發控制

1.3 階段三

JavaScript 中如何實現併發控制

好的,介紹完併發控制之後,阿寶哥將以 github 上 async-pool 這個庫來介紹一下非同步任務併發控制的具體實現。

https://github.com/rxaviers/async-pool

Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。

二、併發控制的實現

async-pool 這個庫提供了 ES7 和 ES6 兩種不同版本的實現,在分析其具體實現之前,我們來看一下它如何使用。

2.1 asyncPool 的使用

consttimeout=i=>newPromise(resolve=>setTimeout(()=>resolve(i),i));
awaitasyncPool(2,[1000,5000,3000,2000],timeout);

在以上程式碼中,我們使用 async-pool 這個庫提供的 asyncPool 函式來實現非同步任務的併發控制。asyncPool 函式的簽名如下所示:

functionasyncPool(poolLimit,array,iteratorFn){...}

該函式接收 3 個引數:

  • poolLimit(數字型別):表示限制的併發數;
  • array(陣列型別):表示任務陣列;
  • iteratorFn(函式型別):表示迭代函式,用於實現對每個任務項進行處理,該函式會返回一個 Promise 物件或非同步函式。

對於以上示例來說,在使用了 asyncPool 函式之後,對應的執行過程如下所示:

consttimeout=i=>newPromise(resolve=>setTimeout(()=>resolve(i),timeout);
//Calliterator(i=1000)
//Calliterator(i=5000)
//Poollimitof2reached,waitforthequickeronetocomplete...
//1000finishes
//Calliterator(i=3000)
//Poollimitof2reached,waitforthequickeronetocomplete...
//3000finishes
//Calliterator(i=2000)
//Itarationiscomplete,waituntilrunningonescomplete...
//5000finishes
//2000finishes
//Resolves,resultsarepassedingivenarrayorder`[1000,2000]`.

通過觀察以上的註釋資訊,我們可以大致地瞭解 asyncPool 函式內部的控制流程。下面我們先來分析 asyncPool 函式的 ES7 實現。

2.2 asyncPool ES7 實現

asyn程式設計客棧cfunctionasyncPool(poolLimit,iteratorFn){
constret=[];//儲存所有的非同步任務
constexecuting=[];//儲存正在執行的非同步任務
for(constitemofarray){
//呼叫iteratorFn函式建立非同步任務
constp=Promise.resolve().then(()=>iteratorFn(item,array));
ret.push(p);//儲存新的非同步任務

//當poolLimit值小於或等於總任務個數時,進行併發控制
if(poolLimit<=array.length){
//當任務完成後,從正在執行的任務陣列中移除已完成的任務
conste=p.then(()=>executing.splice(executing.indexOf(e),1));
executing.push(e);//儲存正在執行的非同步任務
if(executing.length>=poolLimit){
awaitPromise.race(executinwww.cppcns.comg);//等待較快的任務執行完成
}
}
}
returnPromise.all(ret);
}

在以上程式碼中,充分利用了 Promise.all 和 Promise.race 函式特點,再結合 ES7 中提供的 async await 特性,最終實現了併發控制的功能。利用 await Promise.race(executing); 這行語句,我們會等待 正在執行任務列表 中較快的任務執行完成之後,才會繼續執行下一次迴圈。

asyncPool ES7 實現相對比較簡單,接下來我們來看一下不使用 async await 特性要如何實現同樣的功能。

2.3 asyncPool ES6 實現

functionasyncPool(poolLimit,iteratorFn){
leti=0;
constret=[];//儲存所有http://www.cppcns.com的非同步任務
constexecuting=[];//儲存正在執行的非同步任務
constenqueue=function(){
if(i===array.length){
returnPromise.resolve();
}
constitem=array[i++];//獲取新的任務項
constp=Promise.resolve().then(()=>iteratorFn(item,array));
ret.push(p);

letr=Promise.resolve()程式設計客棧;

//當poolLimit值小於或等於總任務個數時,進行併發控制
if(poolLimit<=array.length){
//當任務完成後,從正在執行的任務陣列中移除已完成的任務
conste=p.then(()=>executing.splice(executing.indexOf(e),1));
executing.push(e);
if(executing.length>=poolLimit){
r=Promise.race(executing);
}
}

//正在執行任務列表中較快的任務執行完成之後,才會從array陣列中獲取新的待辦任務
returnr.then(()=>enqueue());
};
returnenqueue().then(()=>Promise.all(ret));
}

在 ES6 的實現版本中,通過內部封裝的 enqueue 函式來實現核心的控制邏輯。當 Promise.race(executing) 返回的 Promise 物件變成已完成狀態時,才會呼叫 enqueue 函式,從 array 陣列中獲取新的待辦任務。

三、阿寶哥有話說

在 asyncPool 這個庫的 ES7 和 ES6 的具體實現中,我們都使用到了 Promise.all 和 Promise.race 函式。其中手寫 Promise.all 是一道常見的面試題。剛好趁著這個機會,阿寶哥跟大家一起來手寫簡易版的 Promise.all 和 Promise.race 函式。

3.1 手寫 Promise.all

Promise.all(iterable) 方法會返回一個 promise 物件,當輸入的所有 promise 物件的狀態都變成 resolved 時,返回的 promise 物件就會以陣列的形式,返回每個 promise 物件 resolve 後的結果。當輸入的任何一個 promise 物件狀態變成 rejected 時,則返回的 promise 物件會 reject 對應的錯誤資訊。

Promise.all=function(iterators){
returnnewPromise((resolve,reject)=>{
if(!iterators||iterators.length===0){
resolve([]);
}else{
letcount=0;//計數器,用於判斷所有任務是否執行完成
letresult=[];//結果陣列
&nhttp://www.cppcns.combsp;for(leti=0;i<iterators.length;i++){
//考慮到iterators[i]可能是普通物件,則統一包裝為Promise物件
Promise.resolve(iterators[i]).then(
(data)=>{
result[i]=data;//按順序儲存對應的結果
//當所有任務都執行完成後,再統一返回結果
if(++count===iterators.length){
resolve(result);
}
},(err)=>{
reject(err);//任何一個Promise物件執行失敗,則呼叫reject()方法
return;
}
);
}
}
});
};

需要注意的是對於 Promise.all 的標準實現來說,它的引數是一個可迭代物件,比如 Array、String 或 Set 等。

3.2 手寫 Promise.race

Promise.race(iterable) 方法會返回一個 promise 物件,一旦迭代器中的某個 promise 物件 resolved 或 rejected,返回的 promise 物件就會 resolve 或 reject 相應的值。

Promise.race=function(iterators){
returnnewPromise((resolve,reject)=>{
for(constiterofiterators){
Promise.resolve(iter)
.then((res)=>{
resolve(res);
})
.catch((e)=>{
reject(e);
});
}
});
};

本文阿寶哥帶大家詳細分析了 async-pool 非同步任務併發控制的具體實現,同時為了讓大家能夠更好地理解 async-pool 的核心程式碼。最後阿寶哥還帶大家一起手寫簡易版的 Promise.all 和 Promise.race 函式。其實除了 Promise.all 函式之外,還存在另一個函式 —— Promise.allSettled,該函式用於解決 Promise.all 存在的問題,感興趣的小夥伴可以自行研究一下。

四、參考資源

Github - async-pool
MDN - Promise.all
MDN - Promise.race
MDN - Promise.allSettled

以上就是javascript 中如何實現併發控制的詳細內容,更多關於javaScript實現併發控制的資料請關注我們其它相關文章!