如何利用 JavaScript 實現併發控制
一、前言
在開發過程中,有時會遇到需要控制任務併發執行數量的需求。
例如一個爬蟲程式,可以通過限制其併發任務數量來降低請求頻率,從而避免由於請求過於頻繁被封禁問題的發生。
接下來,本文介紹如何實現一個併發控制器。
二、示例
consttask=timeout=>newPromise((resolve)=>setTimeout(()=>{
resolve(timeout);
},timeout))
consttaskList=[1000,3000,200,1300,800,2000];
asyncfunctionstartNoConcurrentControl(){
console.time(NO_CONCURRENT_CONTROL_LOG);
awaitPromise.all(taskList.map(item=>task(item)));
console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
}
startNoConcurrentControl();
上述示例程式碼利用 Promise.all 方法模擬6個任務併發執行的場景,執行完所有任務的總耗時為 3000 毫秒。
下面會採用該示例來驗證實現方法的正確性。
三、實現
由於任務併發執行的數量是有限的,那麼就需要一種資料結構來管理不斷產生的任務。
佇列的「先進先出」特性可以保證任務併發執行的順序,在JavaScript中可以通過「陣列來模擬佇列」:
classQueue{
constructor(){
this._queue=[];
}
push(value){
returnthis._queue.push(value);
}
shift(){
returnthis._queue.shift();
}
isEmpty(){
returnthis._queue.length===0;
}
}
對於每一個任務,需要管理其執行函式和引數:
classDelayedTask{
constructor(resolve,fn,args){
this.resolve=resolve;
this.fn=fn;
this.args=args;
}
}
接下來實現核心的 TaskPool 類,該類主要用來控制任務的執行:
classTaskPool{
constructor(size){
this.size=size;
this.queue=newQueue();
}
addTask(fn,args){
returnnewPromise((resolve)=>{
this.queue.push(newDelayedTask(resolve,fn,args));
if(this.size){
this.size--;
const{resolve:taskResole,fn,args}=this.queue.shift();
taskResole(this.runTask(fn,args));
}
})
}
pullTask(){
if(this.queue.isEmpty()){
return;
}
if(this.size===0){
return;
}
this.size++;
const{resolve,fn,args}=this.queue.shift();
resolve(this.runTask(fn,args));
}
runTask(fn,args){
constresult=Promise.resolve(fn(...args));
result.then(()=>{
this.size--;
this.pullTask();
}).catch(()=>{
this.size--;
this.pullTask();
})
returnresult;
}
}
TaskPool 包含三個關鍵方法:
addTask: 將新的任務放入隊列當中,並觸發任務池狀態檢測,如果當前任務池非滿載狀態,則從佇列中取出任務放入任務池中執行。 runTask: 執行當前任務,任務執行完成之後,更新任務池狀態,此時觸發主動拉取新任務的機制。 pullTask: 如果當前佇列不為空,且任務池不滿載,則主動取出佇列中的任務執行。接下來,將前面示例的併發數控制為2個:
constcc=newConcurrentControl(2);
asyncfunctionstartConcurrentControl(){
console.time(CONCURRENT_CONTROL_LOG);
awaitPromise.all(taskList.map(item=>cc.addTask(task,[item])))
console.timeEnd(CONCURRENT_CONTROL_LOG);
}
startConcurrentControl();
執行流程如下:
最終執行任務的總耗時為 5000 毫秒。
http://www.ssnd.com.cn 化妝品OEM代加工
四、高階函式優化引數傳遞
awaitPromise.all(taskList.map(item=>cc.addTask(task,[item])))
手動傳遞每個任務的引數的方式顯得非常繁瑣,這裡可以通過「高階函式實現引數的自動透傳」:
addTask(fn){
return(...args)=>{
returnnewPromise((resolve)=>{
this.queue.push(newDelayedTask(resolve,fn,args));
if(this.size){
this.size--;
const{resolve:taskResole,fn:taskFn,args:taskArgs}=this.queue.shift();
taskResole(this.runTask(taskFn,taskArgs));
}
})
}
}
改造之後的程式碼顯得簡潔了很多:
awaitPromise.all(taskList.map(cc.addTask(task)))
五、優化出隊操作
陣列一般都是基於一塊「連續記憶體」來儲存,當呼叫陣列的 shift 方法時,首先是刪除頭部元素(時間複雜度 O(1)),然後需要將未刪除元素左移一位(時間複雜度 O(n)),所以 shift 操作的時間複雜度為 O(n)。
由於JavaScript語言的特性,V8 在實現jsArray 的時候給出了一種空間和時間權衡的解決方案,在不同的場景下,jsArray 會在 FixedArray 和 HashTable 兩種模式間切換。
在 hashTable 模式下,shift 操作省去了左移的時間複雜度,其時間複雜度可以降低為 O(1),即使如此,shift 仍然是一個耗時的操作。
在陣列元素比較多且需要頻繁執行 shift 操作的場景下,可以通過「reverse + pop」的方式優化。
constBenchmark=require('benchmark');
constsuite=newBenchmark.Suite;
suite.add('shift',function(){
letcount=10;
constarr=generateArray(count);
while(count--){
arr.shift();
}
})
.add('reverse+pop',function(){
letcount=10;
constarr=generateArray(count);
arr.reverse();
while(count--){
arr.pop();
}
})
.on('cycle',function(event){
console.log(String(event.target));
})
.on('complete',function(){
console.log('Fastestis'+this.filter('fastest').map('name'));
console.log('\n')
})
.run({
async:true
})
通過 benchmark.js 跑出的基準測試資料,可以很容易地看出哪種方式的效率更高:
回顧之前 Queue 類的實現,由於只有一個數組來儲存任務,直接使用 reverse + pop 的方式,必然會影響任務執行的次序。
這裡就需要引入雙陣列的設計,一個數組負責入隊操作,一個數組負責出隊操作。
classHighPerformanceQueue{
constructor(){
this.q1=[];//用於push資料
this.q2=[];//用於shift資料
}
push(value){
returnthis.q1.push(value);
}
shift(){
letq2=this.q2;
if(q2.length===0){
constq1=this.q1;
if(q1.length===0){
return;
}
this.q1=q2;//感謝@shaonialife同學指正
q2=this.q2=q1.reverse();
}
returnq2.pop();
}
isEmpty(){
if(this.q1.length===0&&this.q2.length===0){
returntrue;
}
returnfalse;
}
}
最後通過基準測試來驗證優化的效果: