1. 程式人生 > >Promise.all併發限制

Promise.all併發限制

背景

通常,我們在需要保證程式碼在多個非同步處理之後執行,會用到:

Promise.all(promises: []).then(fun: function);

Promise.all可以保證,promises陣列中所有promise物件都達到resolve狀態,才執行then回撥。

這時候考慮一個場景:如果你的promises陣列中每個物件都是http請求,或者說每個物件包含了複雜的呼叫處理。而這樣的物件有幾十萬個。

那麼會出現的情況是,你在瞬間發出幾十萬http請求(tcp連線數不足可能造成等待),或者堆積了無數呼叫棧導致記憶體溢位。

這時候,我們就需要考慮對Promise.all做併發限制。

Promise.all併發限制指的是,每個時刻併發執行的promise數量是固定的,最終的執行結果還是保持與原來的Promise.all一致。

實現

我們知道,promise並不是因為呼叫Promise.all才執行,而是在例項化promise物件的時候就執行了,在理解這一點的基礎上,要實現併發限制,只能從promise例項化上下手。

換句話說,就是把生成promises陣列的控制權,交給併發控制邏輯。

這裡我並不打算一步步實現這個這個功能,npm中有很多實現這個功能的第三方包,比如async-pooles6-promise-poolp-limit,這裡我直接拿async-pool的程式碼來分析一下實現原理。

程式碼很簡單,去掉不必要的程式碼,加上自己的註釋,大概內容如下:

function asyncPool(poolLimit, array, iteratorFn) {
    let i = 0;
    const ret = [];
    const executing = [];
    const enqueue = function () {
        // 邊界處理,array為空陣列
        if (i === array.length) {
            return Promise.resolve();
        }
        // 每調一次enqueue,初始化一個promise
        const item = array[i++];
        const p = Promise.resolve().then(() => iteratorFn(item, array));
        // 放入promises陣列
        ret.push(p);
        // promise執行完畢,從executing陣列中刪除
        const e = p.then(() => executing.splice(executing.indexOf(e), 1));
        // 插入executing數字,表示正在執行的promise
        executing.push(e);
        // 使用Promise.rece,每當executing陣列中promise數量低於poolLimit,就例項化新的promise並執行
        let r = Promise.resolve();
        if (executing.length >= poolLimit) {
            r = Promise.race(executing);
        }
        // 遞迴,直到遍歷完array
        return r.then(() => enqueue());
    };
    return enqueue().then(() => Promise.all(ret));
}

因為是promise加上遞迴,所以在程式碼註釋上不太好標註執行順序,但是大概的邏輯可以總結為:

  1. array第1個元素開始,初始化promise物件,同時用一個executing陣列儲存正在執行的promise
  2. 不斷初始化promise,直到達到poolLimt
  3. 使用Promise.race,獲得executing中promise的執行情況,當有一個promise執行完畢,繼續初始化promise並放入executing
  4. 所有promise都執行完了,呼叫Promise.all返回

使用方式就是:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {
    ...
});

總結

所謂promise併發限制,其實根源上就是控制promise的例項化。如果是通過第三方函式,那麼就把建立promise的控制權交給第三方即可。

然而這樣的實現效果,本質上來說已經拋棄了Promise.all而另闢蹊徑。所以期待有一天promise標準能提供這個功能。