1. 程式人生 > 其它 >如何利用 JavaScript 實現併發控制

如何利用 JavaScript 實現併發控制

一、前言

在開發過程中,有時會遇到需要控制任務併發執行數量的需求。

例如一個爬蟲程式,可以通過限制其併發任務數量來降低請求頻率,從而避免由於請求過於頻繁被封禁問題的發生。

接下來,本文介紹如何實現一個併發控制器。

二、示例

consttask=timeout=>newPromise((resolve)=>setTimeout(()=>{
resolve(timeout);
},timeout))

consttaskList=[1000,3000,200,1300,800,2000];

asyncfunctionstartNoConcurrentControl(){
console.time(NO_CONCURRENT_CONTROL_LOG);
awaitPromise.all(taskList.map(item=>task(item)));
console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
}

startNoConcurrentControl();

上述示例程式碼利用 Promise.all 方法模擬6個任務併發執行的場景,執行完所有任務的總耗時為 3000 毫秒。

下面會採用該示例來驗證實現方法的正確性。

三、實現

由於任務併發執行的數量是有限的,那麼就需要一種資料結構來管理不斷產生的任務。

佇列的「先進先出」特性可以保證任務併發執行的順序,在JavaScript中可以通過「陣列來模擬佇列」

classQueue{
constructor(){
this._queue=[];
}

push(value){
returnthis._queue.push(value);
}

shift(){
returnthis._queue.shift();
}

isEmpty(){
returnthis._queue.length===0;
}
}

對於每一個任務,需要管理其執行函式和引數:

classDelayedTask{
constructor(resolve,fn,args){
this.resolve=resolve;
this.fn=fn;
this.args=args;
}
}

接下來實現核心的 TaskPool 類,該類主要用來控制任務的執行:

classTaskPool{
constructor(size){
this.size=size;
this.queue=newQueue();
}

addTask(fn,args){
returnnewPromise((resolve)=>{
this.queue.push(newDelayedTask(resolve,fn,args));
if(this.size){
this.size--;
const{resolve:taskResole,fn,args}=this.queue.shift();
taskResole(this.runTask(fn,args));
}
})
}

pullTask(){
if(this.queue.isEmpty()){
return;
}

if(this.size===0){
return;
}

this.size++;
const{resolve,fn,args}=this.queue.shift();
resolve(this.runTask(fn,args));
}

runTask(fn,args){
constresult=Promise.resolve(fn(...args));

result.then(()=>{
this.size--;
this.pullTask();
}).catch(()=>{
this.size--;
this.pullTask();
})

returnresult;
}
}

TaskPool 包含三個關鍵方法:

addTask: 將新的任務放入隊列當中,並觸發任務池狀態檢測,如果當前任務池非滿載狀態,則從佇列中取出任務放入任務池中執行。 runTask: 執行當前任務,任務執行完成之後,更新任務池狀態,此時觸發主動拉取新任務的機制。 pullTask: 如果當前佇列不為空,且任務池不滿載,則主動取出佇列中的任務執行。

接下來,將前面示例的併發數控制為2個:

constcc=newConcurrentControl(2);

asyncfunctionstartConcurrentControl(){
console.time(CONCURRENT_CONTROL_LOG);
awaitPromise.all(taskList.map(item=>cc.addTask(task,[item])))
console.timeEnd(CONCURRENT_CONTROL_LOG);
}

startConcurrentControl();

執行流程如下:

最終執行任務的總耗時為 5000 毫秒。

http://www.ssnd.com.cn 化妝品OEM代加工

四、高階函式優化引數傳遞

awaitPromise.all(taskList.map(item=>cc.addTask(task,[item])))

手動傳遞每個任務的引數的方式顯得非常繁瑣,這裡可以通過「高階函式實現引數的自動透傳」

addTask(fn){
return(...args)=>{
returnnewPromise((resolve)=>{
this.queue.push(newDelayedTask(resolve,fn,args));

if(this.size){
this.size--;
const{resolve:taskResole,fn:taskFn,args:taskArgs}=this.queue.shift();
taskResole(this.runTask(taskFn,taskArgs));
}
})
}
}

改造之後的程式碼顯得簡潔了很多:

awaitPromise.all(taskList.map(cc.addTask(task)))

五、優化出隊操作

陣列一般都是基於一塊「連續記憶體」來儲存,當呼叫陣列的 shift 方法時,首先是刪除頭部元素(時間複雜度 O(1)),然後需要將未刪除元素左移一位(時間複雜度 O(n)),所以 shift 操作的時間複雜度為 O(n)。

由於JavaScript語言的特性,V8 在實現jsArray 的時候給出了一種空間和時間權衡的解決方案,在不同的場景下,jsArray 會在 FixedArray 和 HashTable 兩種模式間切換。

在 hashTable 模式下,shift 操作省去了左移的時間複雜度,其時間複雜度可以降低為 O(1),即使如此,shift 仍然是一個耗時的操作。

在陣列元素比較多且需要頻繁執行 shift 操作的場景下,可以通過「reverse + pop」的方式優化。

constBenchmark=require('benchmark');
constsuite=newBenchmark.Suite;

suite.add('shift',function(){
letcount=10;
constarr=generateArray(count);
while(count--){
arr.shift();
}
})
.add('reverse+pop',function(){
letcount=10;
constarr=generateArray(count);
arr.reverse();
while(count--){
arr.pop();
}
})
.on('cycle',function(event){
console.log(String(event.target));
})
.on('complete',function(){
console.log('Fastestis'+this.filter('fastest').map('name'));
console.log('\n')
})
.run({
async:true
})

通過 benchmark.js 跑出的基準測試資料,可以很容易地看出哪種方式的效率更高:

回顧之前 Queue 類的實現,由於只有一個數組來儲存任務,直接使用 reverse + pop 的方式,必然會影響任務執行的次序。

這裡就需要引入雙陣列的設計,一個數組負責入隊操作,一個數組負責出隊操作。

classHighPerformanceQueue{
constructor(){
this.q1=[];//用於push資料
this.q2=[];//用於shift資料
}

push(value){
returnthis.q1.push(value);
}

shift(){
letq2=this.q2;
if(q2.length===0){
constq1=this.q1;
if(q1.length===0){
return;
}
this.q1=q2;//感謝@shaonialife同學指正
q2=this.q2=q1.reverse();
}

returnq2.pop();
}

isEmpty(){
if(this.q1.length===0&&this.q2.length===0){
returntrue;
}
returnfalse;
}
}

最後通過基準測試來驗證優化的效果: