1. 程式人生 > 程式設計 >JS 實現請求排程器

JS 實現請求排程器

前言:js 天然支援並行請求,但與此同時會帶來一些問題,比如會造成目標伺服器壓力過大,所以本文引入“請求排程器”來節制併發度。

TLDR; 直接跳轉『抽象和複用』章節。

為了獲取一批互不依賴的資源,通常從效能考慮可以用 Promise.all(arrayOfPromises)來併發執行。比如我們已有 100 個應用的 id,需求是聚合所有應用的 PV,我們通常會這麼寫:

const ids = [1001,1002,1003,1004,1005];
const urlPrefix = 'http://opensearch.example.com/api/apps';

// fetch 函式傳送 HTTP 請求,返回 Promise
const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);

Promise.all(appPromises)
 // 通過 reduce 做累加
 .then(apps => apps.reduce((initial,current) => initial + current.pv,0))
 .catch((error) => console.log(error));

上面的程式碼在應用個數不多的情況下,可以執行正常。當應用個數達到成千上萬時,對支援併發數不是很好的系統,你的「壓測」會把第三放伺服器搞掛,暫時無法響應請求:

<html>
<head><title>502 Bad Gateway</title></head>
<body bgcolor="white">
<center><h1>502 Bad Gateway</h1></center>
<hr><center>nginx/1.10.1</center>
</body>
</html>

如何解決呢?

一個很自然的想法是,既然不支援這麼多的併發請求,那就分割成幾大塊,每塊為一個 chunkchunk 內部的請求依然併發,但塊的大小(chunkSize)限制在系統支援的最大併發數以內。前一個 chunk 結束後一個 chunk 才能繼續執行,也就是說 chunk 內部的請求是併發的,但 chunk 之間是序列的。思路其實很簡單,寫起來卻有一定難度。總結起來三個操作:分塊、序列、聚合

難點在如何序列執行 Promise,Promise 僅提供了並行(Promise.all)功能,並沒有提供序列功能。我們從簡單的三個請求開始,看如何實現,啟發式解決問題(heuristic)。

// task1,task2,task3 是三個返回 Promise 的工廠函式,模擬我們的非同步請求
const task1 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(1);
 console.log('task1 executed');
 },1000);
});

const task2 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(2);
 console.log('task2 executed');
 },1000);
});

const task3 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(3);
 console.log('task3 executed');
 },1000);
});

// 聚合結果
let result = 0;

const resultPromise = [task1,task3].reduce((current,next) => 	 
 current.then((number) => {
 console.log('resolved with number',number); // task2,task3 的 Promise 將在這裡被 resolve
 result += number;

 return next();
 }),Promise.resolve(0)) // 聚合初始值

 .then(function(last) {
 console.log('The last promise resolved with number',last); // task3 的 Promise 在這裡被 resolve

 result += last;

 console.log('all executed with result',result);

 return Promise.resolve(result);
 });

執行結果如圖 1:

JS 實現請求排程器

程式碼解析:我們想要的效果,直觀展示其實是 fn1().then(() => fn2()).then(() => fn3())。上面程式碼能讓一組 Promise 按順序執行的關鍵之處就在 reduce 這個“引擎”在一步步推動 Promise 工廠函式的執行。

難點解決了,我們看看最終程式碼:

/**
 * 模擬 HTTP 請求
 * @param {String} url 
 * @return {Promise}
 */
function fetch(url) {
 console.log(`Fetching ${url}`);
 return new Promise((resolve) => {
 setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }),2000);
 });
}

const urlPrefix = 'http://opensearch.example.com/api/apps';

const aggregator = {
 /**
 * 入口方法,開啟定時任務
 * 
 * @return {Promise}
 */
 start() {
 return this.fetchAppIds()
 .then(ids => this.fetchAppsSerially(ids,2))
 .then(apps => this.sumPv(apps))
 .catch(error => console.error(error));
 },/**
 * 獲取所有應用的 ID
 *
 * @private
 * 
 * @return {Promise}
 */
 fetchAppIds() {
 return Promise.resolve([1001,1005]);
 },promiseFactory(ids) {
 return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
 },/**
 * 獲取所有應用的詳情
 * 
 * 一次併發請求 `concurrency` 個應用,稱為一個 chunk
 * 前一個 `chunk` 併發完成後一個才繼續,直程式設計客棧至所有應用獲取完畢
 *
 * @private
 *
 * @param {[Number]} ids
 * @param {Number} concurrency 一次併發的請求數量
 * @return {[Object]}  所有應用的資訊
 */
 fetchAppsSerially(ids,concurrency = 100) {
 // 分塊
 let chunkOfIds = ids.splice(0,concurrency);
 const tasks = [];
 
 while (chunkOfIds.length !== 0) {
 tasks.push(this.promiseFactory(chunkOfIds));
 chunkOfIds = ids.splice(0,concurrency);
 }
 
 // 按塊順序執行
 con程式設計客棧st result = [];
 return tasks.reduce((current,next) => current.then((chunkOfApps) => {
 console.info('Chunk of',chunkOfApps.length,'concurrency requests has finished with result:',chunkOfApps,'\n\n');
 result.push(...chunkOfApps); // 拍扁陣列
 return next();
 }),Promise.resolve([]))
 .then((lastchunkOfApps) => {
 console.info('Chunk of',lastchunkOfApps.length,lastchunkOfApps,'\n\n');

 result.push(...lastchunkOfApps); // 再次拍扁它
 console.info('All chunks has been executed with result',result);
 return result;
 });
 },/**
 * 聚合所有應用的 PV
 * 
 * @private
 * 
 * @param {[]} apps 
 * @return {[type]} [description]
 */
 sumPv(apps) {
 const initial = { pv: 0 };

 return apps.reduce((accumulator,app) => ({ pv: accumulator.pv + app.pv }),initial);
 }
};

// 開始執行
aggregator.start().then(console.log);

執行結果如圖 2:

JS 實現請求排程器

抽象和複用

目的達到了,因具備通用性,下面開始抽象成一個模式以便複用。

序列

先模擬一個 http get 請求。

/**
 * mocked http get.
 * @param {string} url
 * @returns {{ url: string; delay: number; }}
 */
function httpGet(url) {
 const delay = Math.random() * 1000;

 console.info('GET',url);

 return new Promise((resolve) => {
 setTimeout(() => {
 resolve({
 url,delay,at: Date.now()
 })
 },delay);
 })
}

序列執行一批請求。

const ids = [1,2,3,4,5,6,7];

// 批量請求函式,注意是 deAEZCIflay 執行的『函式』對了,否則會立即將請求傳送出去,達不到序列的目的
const httpGetters = ids.map(id => 
 () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
);

// 序列執行之
const tasks = await httpGetters.reduce((acc,cur) => {
 return acc.then(cur);
 
 // 簡寫,等價於
 // return acc.then(() => cur());
},Promise.resolve());

tasks.then(() => {
 console.log('done');
});

注意觀察控制檯輸出,應該序列輸出以下內容:

GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
GET https://jsonplaceholder.http://www.cppcns.comtypicode.com/posts/7

分段序列,段中並行

重點來了。本文的請求排程器實現

/**
 * Schedule promises.
 * @param {Array<(...arg: any[]) => Promise<any>>} factories 
 * @param {number} concurrency 
 */
function schedulePromises(factories,concurrency) {
 /**
 * chunk
 * @param {any[]} arr 
 * @param {number} size 
 * @returns {Array<any[]>}
 */
 const chunk = (arr,size = 1) => {
 return arr.reduce((acc,cur,idx) => {
 const modulo = idx % size;

 if (modulo === 0) {
 acc[acc.length] = [cur];
 } else {
 acc[acc.length - 1].push(cur);
 }

 return acc;
 },[])
 };

 const chunks = chunk(factories,concurrency);

 let resps = [];

 return chunks.reduce(
 (acc,cur) => {
 return acc
 .then(() => {
  console.log('---');
  return Promise.all(cur.map(f => f()));
 })
 .then((intermediateResponses) => {
  resps.push(...intermediateResponses);

  return resps;
 })
 },Promise.resolve()
 );
}

測試下,執行排程器:

// 分段序列,段中並行
schedulePromises(httpGetters,3).then((resps) => {
 console.log('resps:',resps);
});

控制檯輸出:

---
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
---
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
---
GET https://jsonplaceholder.typicode.com/posts/7

resps: [
 {
 "url": "https://jsonplaceholder.typicode.com/posts/1","delay": 733.010980640727,"at": 1615131322163
 },{
 "url": "https://jsonplaceholder.typicode.com/posts/2","delay": 594.5056229848931,"at": 1615131322024
 },{
 "url": "https://jsonplaceholder.typicode.com/posts/3","delay": 738.8230109146299,"at": 1615131322168
 },{
 "url": "https://jsonplaceholder.typicode.com/posts/4","delay": 525.4604386109747,"at": 1615131322698
 },{
 "url": "https://jsonplaceholder.typicode.com/posts/5","delay": 29.086379722201183,"at": 1615131322201
 },{
 "url": "https://jsonplaceholder.typicode.com/posts/6","delay": 592.2345027398272,"at": 1615131322765
 },{
 "url": "https://jsonplaceholder.typicode.com/posts/7","dewww.cppcns.comlay": 513.0684467560949,"at": 1615131323284
 }
]

總結

  1. 如果併發請求的數量太大,可以考慮分塊序列,塊中請求併發。
  2. 問題看似複雜,不放先簡化之,然後一步步推匯出關鍵點,最後抽象,就能找到解決方案。
  3. 本文的精髓在於使用 reduce 作為序列推動的引擎,故掌握其對我們日常開發遇到的迷局破解可提供新思路,reduce 精通見上篇 你終於用 Reduce 了 🎉。

以上就是JS 實現請求排程器的詳細內容,更多關於JS 請求排程器的資料請關注我們其它相關文章!