JS 实现请求调度器
作者:孟陬 发布时间:2024-04-22 22:37:24
目录
抽象和复用
串行
分段串行,段中并行
总结
前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。
TLDR; 直接跳转『抽象和复用』章节。
为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPromises)
来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 PV,我们通常会这么写:
const ids = [1001, 1002, 1003, 1004, 1005];
const urlPrefix = 'http://opensearch.example.com/api/apps';
// fetch 函数发送 HTTP 请求,返回 Promise
const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);
Promise.all(appPromises)
// 通过 reduce 做累加
.then(apps => apps.reduce((initial, current) => initial + current.pv, 0))
.catch((error) => console.log(error));
上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:
<html>
<head><title>502 Bad Gateway</title></head>
<body bgcolor="white">
<center><h1>502 Bad Gateway</h1></center>
<hr><center>nginx/1.10.1</center>
</body>
</html>
如何解决呢?
一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunk
,chunk
内部的请求依然并发,但块的大小(chunkSize
)限制在系统支持的最大并发数以内。前一个 chunk
结束后一个 chunk
才能继续执行,也就是说 chunk
内部的请求是并发的,但 chunk
之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合
难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all
)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。
// task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求
const task1 = () => new Promise((resolve) => {
setTimeout(() => {
resolve(1);
console.log('task1 executed');
}, 1000);
});
const task2 = () => new Promise((resolve) => {
setTimeout(() => {
resolve(2);
console.log('task2 executed');
}, 1000);
});
const task3 = () => new Promise((resolve) => {
setTimeout(() => {
resolve(3);
console.log('task3 executed');
}, 1000);
});
// 聚合结果
let result = 0;
const resultPromise = [task1, task2, task3].reduce((current, next) =>
current.then((number) => {
console.log('resolved with number', number); // task2, task3 的 Promise 将在这里被 resolve
result += number;
return next();
}),
Promise.resolve(0)) // 聚合初始值
.then(function(last) {
console.log('The last promise resolved with number', last); // task3 的 Promise 在这里被 resolve
result += last;
console.log('all executed with result', result);
return Promise.resolve(result);
});
运行结果如图 1:
代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())
。上面代码能让一组 Promise
按顺序执行的关键之处就在 reduce
这个“引擎”在一步步推动 Promise
工厂函数的执行。
难点解决了,我们看看最终代码:
/**
* 模拟 HTTP 请求
* @param {String} url
* @return {Promise}
*/
function fetch(url) {
console.log(`Fetching ${url}`);
return new Promise((resolve) => {
setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);
});
}
const urlPrefix = 'http://opensearch.example.com/api/apps';
const aggregator = {
/**
* 入口方法,开启定时任务
*
* @return {Promise}
*/
start() {
return this.fetchAppIds()
.then(ids => this.fetchAppsSerially(ids, 2))
.then(apps => this.sumPv(apps))
.catch(error => console.error(error));
},
/**
* 获取所有应用的 ID
*
* @private
*
* @return {Promise}
*/
fetchAppIds() {
return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
},
promiseFactory(ids) {
return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
},
/**
* 获取所有应用的详情
*
* 一次并发请求 `concurrency` 个应用,称为一个 chunk
* 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕
*
* @private
*
* @param {[Number]} ids
* @param {Number} concurrency 一次并发的请求数量
* @return {[Object]} 所有应用的信息
*/
fetchAppsSerially(ids, concurrency = 100) {
// 分块
let chunkOfIds = ids.splice(0, concurrency);
const tasks = [];
while (chunkOfIds.length !== 0) {
tasks.push(this.promiseFactory(chunkOfIds));
chunkOfIds = ids.splice(0, concurrency);
}
// 按块顺序执行
const result = [];
return tasks.reduce((current, next) => current.then((chunkOfApps) => {
console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
result.push(...chunkOfApps); // 拍扁数组
return next();
}), Promise.resolve([]))
.then((lastchunkOfApps) => {
console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');
result.push(...lastchunkOfApps); // 再次拍扁它
console.info('All chunks has been executed with result', result);
return result;
});
},
/**
* 聚合所有应用的 PV
*
* @private
*
* @param {[]} apps
* @return {[type]} [description]
*/
sumPv(apps) {
const initial = { pv: 0 };
return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);
}
};
// 开始运行
aggregator.start().then(console.log);
运行结果如图 2:
抽象和复用
目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。
串行
先模拟一个 http get 请求。
/**
* mocked http get.
* @param {string} url
* @returns {{ url: string; delay: number; }}
*/
function httpGet(url) {
const delay = Math.random() * 1000;
console.info('GET', url);
return new Promise((resolve) => {
setTimeout(() => {
resolve({
url,
delay,
at: Date.now()
})
}, delay);
})
}
串行执行一批请求。
const ids = [1, 2, 3, 4, 5, 6, 7];
// 批量请求函数,注意是 delay 执行的『函数』对了,否则会立即将请求发送出去,达不到串行的目的
const httpGetters = ids.map(id =>
() => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
);
// 串行执行之
const tasks = await httpGetters.reduce((acc, cur) => {
return acc.then(cur);
// 简写,等价于
// return acc.then(() => cur());
}, Promise.resolve());
tasks.then(() => {
console.log('done');
});
注意观察控制台输出,应该串行输出以下内容:
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
GET https://jsonplaceholder.typicode.com/posts/7
分段串行,段中并行
重点来了。本文的请求调度器实现
/**
* Schedule promises.
* @param {Array<(...arg: any[]) => Promise<any>>} factories
* @param {number} concurrency
*/
function schedulePromises(factories, concurrency) {
/**
* chunk
* @param {any[]} arr
* @param {number} size
* @returns {Array<any[]>}
*/
const chunk = (arr, size = 1) => {
return arr.reduce((acc, cur, idx) => {
const modulo = idx % size;
if (modulo === 0) {
acc[acc.length] = [cur];
} else {
acc[acc.length - 1].push(cur);
}
return acc;
}, [])
};
const chunks = chunk(factories, concurrency);
let resps = [];
return chunks.reduce(
(acc, cur) => {
return acc
.then(() => {
console.log('---');
return Promise.all(cur.map(f => f()));
})
.then((intermediateResponses) => {
resps.push(...intermediateResponses);
return resps;
})
},
Promise.resolve()
);
}
测试下,执行调度器:
// 分段串行,段中并行
schedulePromises(httpGetters, 3).then((resps) => {
console.log('resps:', resps);
});
控制台输出:
---
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
---
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
---
GET https://jsonplaceholder.typicode.com/posts/7
resps: [
{
"url": "https://jsonplaceholder.typicode.com/posts/1",
"delay": 733.010980640727,
"at": 1615131322163
},
{
"url": "https://jsonplaceholder.typicode.com/posts/2",
"delay": 594.5056229848931,
"at": 1615131322024
},
{
"url": "https://jsonplaceholder.typicode.com/posts/3",
"delay": 738.8230109146299,
"at": 1615131322168
},
{
"url": "https://jsonplaceholder.typicode.com/posts/4",
"delay": 525.4604386109747,
"at": 1615131322698
},
{
"url": "https://jsonplaceholder.typicode.com/posts/5",
"delay": 29.086379722201183,
"at": 1615131322201
},
{
"url": "https://jsonplaceholder.typicode.com/posts/6",
"delay": 592.2345027398272,
"at": 1615131322765
},
{
"url": "https://jsonplaceholder.typicode.com/posts/7",
"delay": 513.0684467560949,
"at": 1615131323284
}
]
总结
如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
本文的精髓在于使用
reduce
作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce
精通见上篇 你终于用 Reduce 了 🎉。
来源:https://juejin.cn/post/6936859831060037668


猜你喜欢
- 一、字典、元组的多重嵌套例 1:记录全班学生的成绩。分析:定义一个 SimpleGradebook类,学生名是字典self._grades的
- 有一个ssqdatav2数据,要找到其中的深圳,并且替换成圳。因为收集到的数据出现了错误,本来只有省份简写的地方却出现了深圳。如何找到DF中
- 本文主要列出来python图形开发GUI库pyqt5的窗体,控件属性与方法如果你想看看python图形开发GUI库pyqt5的基础使用方法可
- 前言mysql 相信大部分人都用过,索引肯定也是用过的,但是你知道如何创建恰当的索引吗?在数据量小的时候,不合适的索引对性能并不会有太大的影
- 介绍图灵完备性(Turing completeness)是通用计算机的一个属性,它表示一个程序可以写另一个程序。比如 go test 命令:
- 最近需要将csv文件转成DataFrame并以json的形式展示到前台,故需要用到Dataframe的to_json方法to_json方法默
- 编程中有时候需要一个初始极大值(或极小值)作为temp,当然可以自定义设置为10000(whatever),不过python中有一个值可以代
- 总览在Python中,您需要通过打开文件来访问文件。您可以使用 open()函数来实现。Open 返回一个文件对象,该文件对象具有用于获取有
- 问题描述输入样例:Only the 11 CAPItal LeTtERS are replaced输出样例:Only the 11 XZKI
- 目录一、常见的高阶函数1.1、filter1.2、map1.3、reduce高阶函数,英文叫 Higher Order function。一
- SQL Server 数据库定时自动备份,供大家参考,具体内容如下在SQL Server中出于数据安全的考虑,所以需要定期的备份数据库。而备
- 目录前后端传输数据的编码格式Ajax提交urlencoded格式数据Ajax通过FormData上传文件Ajax提交Json格式数据Ajax
- bool是Boolean的缩写,只有真(True)和假(False)两种取值bool函数只有一个参数,并根据这个参数的值返回真或者假。1.当
- 模块 定义计算机在开发过程中,代码越写越多,也就越难以维护,所以为了编写可维护的代码,我们会把函数进行分组,放在不同的文件里。在python
- query方法在 pandas 中,支持把字符串形式的查询表达式传入 query 方法来查询数据,其表达式的执行结果必须返回布尔列表。在进行
- pyecharts中的Funnel函数可以绘制漏斗图,自动根据数据大小生成由大到小自上而下排列的一个漏斗样的图形。1、导入Funnel模块。
- Part 1: 简介在PyTorch中,torch.cat()是一个被广泛使用的函数。它可以让我们在某个维度上把多个张量组合在一起。对于那些
- 本文实例讲述了Python中super函数用法。分享给大家供大家参考,具体如下:这是个高大上的函数,在python装13手册里面介绍过多使用
- 1,System.ComponentModelSystem.ComponentModel 命名空间提供用于实现组件和控件的运行时和设计时行为
- EXISTS该函数返回集合中第一个元素的索引,如果集合为空,返回NULLNULLNULLCollection.EXISTS(index)CO