JS 实现请求调度器

前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。

TLDR; 直接跳转『抽象和复用』章节。

为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPromises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 PV,我们通常会这么写:

const ids = [1001, 1002, 1003, 1004, 1005];

const urlPrefix = 'http://opensearch.example.com/api/apps';

// fetch 函数发送 HTTP 请求,返回 Promise

const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);

Promise.all(appPromises)

// 通过 reduce 做累加

.then(apps => apps.reduce((initial, current) => initial + current.pv, 0))

.catch((error) => console.log(error));

上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:

<html>

<head><title>502 Bad Gateway</title></head>

<body bgcolor="white">

<center><h1>502 Bad Gateway</h1></center>

<hr><center>nginx/1.10.1</center>

</body>

</html>

如何解决呢?

一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunkchunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合

难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。

// task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求

const task1 = () => new Promise((resolve) => {

setTimeout(() => {

resolve(1);

console.log('task1 executed');

}, 1000);

});

const task2 = () => new Promise((resolve) => {

setTimeout(() => {

resolve(2);

console.log('task2 executed');

}, 1000);

});

const task3 = () => new Promise((resolve) => {

setTimeout(() => {

resolve(3);

console.log('task3 executed');

}, 1000);

});

// 聚合结果

let result = 0;

const resultPromise = [task1, task2, task3].reduce((current, next) =>

current.then((number) => {

console.log('resolved with number', number); // task2, task3 的 Promise 将在这里被 resolve

result += number;

return next();

}),

Promise.resolve(0)) // 聚合初始值

.then(function(last) {

console.log('The last promise resolved with number', last); // task3 的 Promise 在这里被 resolve

result += last;

console.log('all executed with result', result);

return Promise.resolve(result);

});

运行结果如图 1:

代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。

难点解决了,我们看看最终代码:

/**

* 模拟 HTTP 请求

* @param {String} url

* @return {Promise}

*/

function fetch(url) {

console.log(`Fetching ${url}`);

return new Promise((resolve) => {

setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);

});

}

const urlPrefix = 'http://opensearch.example.com/api/apps';

const aggregator = {

/**

* 入口方法,开启定时任务

*

* @return {Promise}

*/

start() {

return this.fetchAppIds()

.then(ids => this.fetchAppsSerially(ids, 2))

.then(apps => this.sumPv(apps))

.catch(error => console.error(error));

},

/**

* 获取所有应用的 ID

*

* @private

*

* @return {Promise}

*/

fetchAppIds() {

return Promise.resolve([1001, 1002, 1003, 1004, 1005]);

},

promiseFactory(ids) {

return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));

},

/**

* 获取所有应用的详情

*

* 一次并发请求 `concurrency` 个应用,称为一个 chunk

* 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕

*

* @private

*

* @param {[Number]} ids

* @param {Number} concurrency 一次并发的请求数量

* @return {[Object]} 所有应用的信息

*/

fetchAppsSerially(ids, concurrency = 100) {

// 分块

let chunkOfIds = ids.splice(0, concurrency);

const tasks = [];

while (chunkOfIds.length !== 0) {

tasks.push(this.promiseFactory(chunkOfIds));

chunkOfIds = ids.splice(0, concurrency);

}

// 按块顺序执行

const result = [];

return tasks.reduce((current, next) => current.then((chunkOfApps) => {

console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');

result.push(...chunkOfApps); // 拍扁数组

return next();

}), Promise.resolve([]))

.then((lastchunkOfApps) => {

console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');

result.push(...lastchunkOfApps); // 再次拍扁它

console.info('All chunks has been executed with result', result);

return result;

});

},

/**

* 聚合所有应用的 PV

*

* @private

*

* @param {[]} apps

* @return {[type]} [description]

*/

sumPv(apps) {

const initial = { pv: 0 };

return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);

}

};

// 开始运行

aggregator.start().then(console.log);

运行结果如图 2:

抽象和复用

目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。

串行

先模拟一个 http get 请求。

/**

* mocked http get.

* @param {string} url

* @returns {{ url: string; delay: number; }}

*/

function httpGet(url) {

const delay = Math.random() * 1000;

console.info('GET', url);

return new Promise((resolve) => {

setTimeout(() => {

resolve({

url,

delay,

at: Date.now()

})

}, delay);

})

}

串行执行一批请求。

const ids = [1, 2, 3, 4, 5, 6, 7];

// 批量请求函数,注意是 delay 执行的『函数』对了,否则会立即将请求发送出去,达不到串行的目的

const httpGetters = ids.map(id =>

() => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)

);

// 串行执行之

const tasks = await httpGetters.reduce((acc, cur) => {

return acc.then(cur);

// 简写,等价于

// return acc.then(() => cur());

}, Promise.resolve());

tasks.then(() => {

console.log('done');

});

注意观察控制台输出,应该串行输出以下内容:

GET https://jsonplaceholder.typicode.com/posts/1

GET https://jsonplaceholder.typicode.com/posts/2

GET https://jsonplaceholder.typicode.com/posts/3

GET https://jsonplaceholder.typicode.com/posts/4

GET https://jsonplaceholder.typicode.com/posts/5

GET https://jsonplaceholder.typicode.com/posts/6

GET https://jsonplaceholder.typicode.com/posts/7

分段串行,段中并行

重点来了。本文的请求调度器实现

/**

* Schedule promises.

* @param {Array<(...arg: any[]) => Promise<any>>} factories

* @param {number} concurrency

*/

function schedulePromises(factories, concurrency) {

/**

* chunk

* @param {any[]} arr

* @param {number} size

* @returns {Array<any[]>}

*/

const chunk = (arr, size = 1) => {

return arr.reduce((acc, cur, idx) => {

const modulo = idx % size;

if (modulo === 0) {

acc[acc.length] = [cur];

} else {

acc[acc.length - 1].push(cur);

}

return acc;

}, [])

};

const chunks = chunk(factories, concurrency);

let resps = [];

return chunks.reduce(

(acc, cur) => {

return acc

.then(() => {

console.log('---');

return Promise.all(cur.map(f => f()));

})

.then((intermediateResponses) => {

resps.push(...intermediateResponses);

return resps;

})

},

Promise.resolve()

);

}

测试下,执行调度器:

// 分段串行,段中并行

schedulePromises(httpGetters, 3).then((resps) => {

console.log('resps:', resps);

});

控制台输出:

---

GET https://jsonplaceholder.typicode.com/posts/1

GET https://jsonplaceholder.typicode.com/posts/2

GET https://jsonplaceholder.typicode.com/posts/3

---

GET https://jsonplaceholder.typicode.com/posts/4

GET https://jsonplaceholder.typicode.com/posts/5

GET https://jsonplaceholder.typicode.com/posts/6

---

GET https://jsonplaceholder.typicode.com/posts/7

resps: [

{

"url": "https://jsonplaceholder.typicode.com/posts/1",

"delay": 733.010980640727,

"at": 1615131322163

},

{

"url": "https://jsonplaceholder.typicode.com/posts/2",

"delay": 594.5056229848931,

"at": 1615131322024

},

{

"url": "https://jsonplaceholder.typicode.com/posts/3",

"delay": 738.8230109146299,

"at": 1615131322168

},

{

"url": "https://jsonplaceholder.typicode.com/posts/4",

"delay": 525.4604386109747,

"at": 1615131322698

},

{

"url": "https://jsonplaceholder.typicode.com/posts/5",

"delay": 29.086379722201183,

"at": 1615131322201

},

{

"url": "https://jsonplaceholder.typicode.com/posts/6",

"delay": 592.2345027398272,

"at": 1615131322765

},

{

"url": "https://jsonplaceholder.typicode.com/posts/7",

"delay": 513.0684467560949,

"at": 1615131323284

}

]

总结

  1. 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
  2. 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
  3. 本文的精髓在于使用 reduce 作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce 精通见上篇 你终于用 Reduce 了 🎉。

以上就是JS 实现请求调度器的详细内容,更多关于JS 请求调度器的资料请关注其它相关文章!

以上是 JS 实现请求调度器 的全部内容, 来源链接: utcz.com/p/219717.html

回到顶部