JS 實現請求排程器
前言:js 天然支援並行請求,但與此同時會帶來一些問題,比如會造成目標伺服器壓力過大,所以本文引入“請求排程器”來節制併發度。
TLDR; 直接跳轉『抽象和複用』章節。
為了獲取一批互不依賴的資源,通常從效能考慮可以用 Promise.all(arrayOfPromises)
來併發執行。比如我們已有 100 個應用的 id,需求是聚合所有應用的 PV,我們通常會這麼寫:
const ids = [1001,1002,1003,1004,1005]; const urlPrefix = 'http://opensearch.example.com/api/apps'; // fetch 函式傳送 HTTP 請求,返回 Promise const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch); Promise.all(appPromises) // 通過 reduce 做累加 .then(apps => apps.reduce((initial,current) => initial + current.pv,0)) .catch((error) => console.log(error));
上面的程式碼在應用個數不多的情況下,可以執行正常。當應用個數達到成千上萬時,對支援併發數不是很好的系統,你的「壓測」會把第三放伺服器搞掛,暫時無法響應請求:
<html> <head><title>502 Bad Gateway</title></head> <body bgcolor="white"> <center><h1>502 Bad Gateway</h1></center> <hr><center>nginx/1.10.1</center> </body> </html>
如何解決呢?
一個很自然的想法是,既然不支援這麼多的併發請求,那就分割成幾大塊,每塊為一個 chunk
,chunk
內部的請求依然併發,但塊的大小(chunkSize
)限制在系統支援的最大併發數以內。前一個 chunk
結束後一個 chunk
才能繼續執行,也就是說 chunk
內部的請求是併發的,但 chunk
之間是序列的。思路其實很簡單,寫起來卻有一定難度。總結起來三個操作:分塊、序列、聚合
難點在如何序列執行 Promise,Promise 僅提供了並行(Promise.all
)功能,並沒有提供序列功能。我們從簡單的三個請求開始,看如何實現,啟發式解決問題(heuristic)。
// task1,task2,task3 是三個返回 Promise 的工廠函式,模擬我們的非同步請求 const task1 = () => new Promise((resolve) => { setTimeout(() => { resolve(1); console.log('task1 executed'); },1000); }); const task2 = () => new Promise((resolve) => { setTimeout(() => { resolve(2); console.log('task2 executed'); },1000); }); const task3 = () => new Promise((resolve) => { setTimeout(() => { resolve(3); console.log('task3 executed'); },1000); }); // 聚合結果 let result = 0; const resultPromise = [task1,task3].reduce((current,next) => current.then((number) => { console.log('resolved with number',number); // task2,task3 的 Promise 將在這裡被 resolve result += number; return next(); }),Promise.resolve(0)) // 聚合初始值 .then(function(last) { console.log('The last promise resolved with number',last); // task3 的 Promise 在這裡被 resolve result += last; console.log('all executed with result',result); return Promise.resolve(result); });
執行結果如圖 1:
程式碼解析:我們想要的效果,直觀展示其實是 fn1().then(() => fn2()).then(() => fn3())
。上面程式碼能讓一組 Promise
按順序執行的關鍵之處就在 reduce
這個“引擎”在一步步推動 Promise
工廠函式的執行。
難點解決了,我們看看最終程式碼:
/** * 模擬 HTTP 請求 * @param {String} url * @return {Promise} */ function fetch(url) { console.log(`Fetching ${url}`); return new Promise((resolve) => { setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }),2000); }); } const urlPrefix = 'http://opensearch.example.com/api/apps'; const aggregator = { /** * 入口方法,開啟定時任務 * * @return {Promise} */ start() { return this.fetchAppIds() .then(ids => this.fetchAppsSerially(ids,2)) .then(apps => this.sumPv(apps)) .catch(error => console.error(error)); },/** * 獲取所有應用的 ID * * @private * * @return {Promise} */ fetchAppIds() { return Promise.resolve([1001,1005]); },promiseFactory(ids) { return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch)); },/** * 獲取所有應用的詳情 * * 一次併發請求 `concurrency` 個應用,稱為一個 chunk * 前一個 `chunk` 併發完成後一個才繼續,直程式設計客棧至所有應用獲取完畢 * * @private * * @param {[Number]} ids * @param {Number} concurrency 一次併發的請求數量 * @return {[Object]} 所有應用的資訊 */ fetchAppsSerially(ids,concurrency = 100) { // 分塊 let chunkOfIds = ids.splice(0,concurrency); const tasks = []; while (chunkOfIds.length !== 0) { tasks.push(this.promiseFactory(chunkOfIds)); chunkOfIds = ids.splice(0,concurrency); } // 按塊順序執行 con程式設計客棧st result = []; return tasks.reduce((current,next) => current.then((chunkOfApps) => { console.info('Chunk of',chunkOfApps.length,'concurrency requests has finished with result:',chunkOfApps,'\n\n'); result.push(...chunkOfApps); // 拍扁陣列 return next(); }),Promise.resolve([])) .then((lastchunkOfApps) => { console.info('Chunk of',lastchunkOfApps.length,lastchunkOfApps,'\n\n'); result.push(...lastchunkOfApps); // 再次拍扁它 console.info('All chunks has been executed with result',result); return result; }); },/** * 聚合所有應用的 PV * * @private * * @param {[]} apps * @return {[type]} [description] */ sumPv(apps) { const initial = { pv: 0 }; return apps.reduce((accumulator,app) => ({ pv: accumulator.pv + app.pv }),initial); } }; // 開始執行 aggregator.start().then(console.log);
執行結果如圖 2:
抽象和複用
目的達到了,因具備通用性,下面開始抽象成一個模式以便複用。
序列
先模擬一個 http get 請求。
/** * mocked http get. * @param {string} url * @returns {{ url: string; delay: number; }} */ function httpGet(url) { const delay = Math.random() * 1000; console.info('GET',url); return new Promise((resolve) => { setTimeout(() => { resolve({ url,delay,at: Date.now() }) },delay); }) }
序列執行一批請求。
const ids = [1,2,3,4,5,6,7];
// 批量請求函式,注意是 deAEZCIflay 執行的『函式』對了,否則會立即將請求傳送出去,達不到序列的目的
const httpGetters = ids.map(id =>
() => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
);
// 序列執行之
const tasks = await httpGetters.reduce((acc,cur) => {
return acc.then(cur);
// 簡寫,等價於
// return acc.then(() => cur());
},Promise.resolve());
tasks.then(() => {
console.log('done');
});
注意觀察控制檯輸出,應該序列輸出以下內容:
GET https://jsonplaceholder.typicode.com/posts/1 GET https://jsonplaceholder.typicode.com/posts/2 GET https://jsonplaceholder.typicode.com/posts/3 GET https://jsonplaceholder.typicode.com/posts/4 GET https://jsonplaceholder.typicode.com/posts/5 GET https://jsonplaceholder.typicode.com/posts/6 GET https://jsonplaceholder.http://www.cppcns.comtypicode.com/posts/7
分段序列,段中並行
重點來了。本文的請求排程器實現
/** * Schedule promises. * @param {Array<(...arg: any[]) => Promise<any>>} factories * @param {number} concurrency */ function schedulePromises(factories,concurrency) { /** * chunk * @param {any[]} arr * @param {number} size * @returns {Array<any[]>} */ const chunk = (arr,size = 1) => { return arr.reduce((acc,cur,idx) => { const modulo = idx % size; if (modulo === 0) { acc[acc.length] = [cur]; } else { acc[acc.length - 1].push(cur); } return acc; },[]) }; const chunks = chunk(factories,concurrency); let resps = []; return chunks.reduce( (acc,cur) => { return acc .then(() => { console.log('---'); return Promise.all(cur.map(f => f())); }) .then((intermediateResponses) => { resps.push(...intermediateResponses); return resps; }) },Promise.resolve() ); }
測試下,執行排程器:
// 分段序列,段中並行 schedulePromises(httpGetters,3).then((resps) => { console.log('resps:',resps); });
控制檯輸出:
---
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
---
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
---
GET https://jsonplaceholder.typicode.com/posts/7
resps: [
{
"url": "https://jsonplaceholder.typicode.com/posts/1","delay": 733.010980640727,"at": 1615131322163
},{
"url": "https://jsonplaceholder.typicode.com/posts/2","delay": 594.5056229848931,"at": 1615131322024
},{
"url": "https://jsonplaceholder.typicode.com/posts/3","delay": 738.8230109146299,"at": 1615131322168
},{
"url": "https://jsonplaceholder.typicode.com/posts/4","delay": 525.4604386109747,"at": 1615131322698
},{
"url": "https://jsonplaceholder.typicode.com/posts/5","delay": 29.086379722201183,"at": 1615131322201
},{
"url": "https://jsonplaceholder.typicode.com/posts/6","delay": 592.2345027398272,"at": 1615131322765
},{
"url": "https://jsonplaceholder.typicode.com/posts/7","dewww.cppcns.comlay": 513.0684467560949,"at": 1615131323284
}
]
總結
- 如果併發請求的數量太大,可以考慮分塊序列,塊中請求併發。
- 問題看似複雜,不放先簡化之,然後一步步推匯出關鍵點,最後抽象,就能找到解決方案。
- 本文的精髓在於使用
reduce
作為序列推動的引擎,故掌握其對我們日常開發遇到的迷局破解可提供新思路,reduce
精通見上篇 你終於用 Reduce 了 🎉。
以上就是JS 實現請求排程器的詳細內容,更多關於JS 請求排程器的資料請關注我們其它相關文章!