async-pool的JS 實現併發控制
前言
最近看了些js 併發如何實現,也查閱了一下資料 ,查考了一下demo
日常開發中會遇到併發控制的場景,比如控制請求併發數。那麼在 JavaScript 中如何實現併發控制呢?
併發控制。
下面有8個待辦任務要執行,而我們希望限制同時執行的任務個數,即最多隻有 2 個任務能同時執行。
當 正在執行任務列表 中的任何 1 個任務完成後,程式會自動從 待辦任務列表 中獲取新的待辦任務並把該任務新增到 正在執行任務列表,
介紹 下async-pool 這個庫來介紹一下非同步任務併發控制的具體實現。
也是查閱了些資料 去研究了下asyncPool 的使用async-pool:github.com/rxaviers/as…
Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。
async-pool 這個庫提供了 ES7 和 ES6 兩種不同版本的實現,在分析其具體實現之前,我們來看一下它如何使用。
2.1 asyncPool 的使用
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
在以上程式碼中,我們使用 async-pool 這個庫提供的 asyncPool
函式來實現非同步任務的併發控制。 asyncPool
function asyncPool(poolLimit, array, iteratorFn){ ... }
該函式接收 3 個引數:
poolLimit
(數字型別):表示限制的併發數;array
(陣列型別):表示任務陣列;iteratorFn
(函式型別):表示迭代函式,用於實現對每個任務項進行處理,該函式會返回一個 Promise 物件或非同步函式。
對於以上示例來說,在使用了 asyncPool
函式之後,對應的執行過程如下所示:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
通過觀察以上的註釋資訊,我們可以大致地瞭解 asyncPool
函式內部的控制流程。下面我們先來分析 asyncPool
函式的 ES7 實現。
2.2 asyncPool ES7 實現
async function asyncPool(poolLimit, array, iteratorFn) { const ret = []; // 儲存所有的非同步任務 const executing = []; // 儲存正在執行的非同步任務 for (const item of array) { // 呼叫iteratorFn函式建立非同步任務 const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); // 儲存新的非同步任務 // 當poolLimit值小於或等於總任務個數時,進行併發控制 if (poolLimit <= array.length) { // 當任務完成後,從正在執行的任務陣列中移除已完成的任務 const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); // 儲存正在執行的非同步任務 if (executing.length >= poolLimit) { await Promise.race(executing); // 等待較快的任務執行完成 } } } return Promise.all(ret); }
在以上程式碼中,充分利用了 Promise.all
和 Promise.race
函式特點,再結合 ES7 中提供的 async await
特性,
最終實現了併發控制的功能。
利用 await Promise.race(executing);
這行語句,我們會等待 正在執行任務列表 中較快的任務執行完成之後,才會繼續執行下一次迴圈。
asyncPool ES7 實現相對比較簡單,接下來我們來看一下不使用 async await
特性要如何實現同樣的功能。
2.3 asyncPool ES6 實現
function asyncPool(poolLimit, array, iteratorFn) { let i = 0; const ret = []; // 儲存所有的非同步任務 const executing = []; // 儲存正在執行的非同步任務 const enqueue = function () { if (i === array.length) { return Promise.resolve(); } const item = array[i++]; // 獲取新的任務項 const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); let r = Promise.resolve(); // 當poolLimit值小於或等於總任務個數時,進行併發控制 if (poolLimit <= array.length) { // 當任務完成後,從正在執行的任務陣列中移除已完成的任務 const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { r = Promise.race(executing); } } // 正在執行任務列表 中較快的任務執行完成之後,才會從array陣列中獲取新的待辦任務 return r.then(() => enqueue()); }; return enqueue().then(() => Promise.all(ret)); }
在 ES6 的實現版本中,通過內部封裝的 enqueue
函式來實現核心的控制邏輯。
當 Promise.race(executing)
返回的 Promise
物件變成已完成狀態時,才會呼叫 enqueue
函式,從 array
陣列中獲取新的待辦任務。