Promise 的并发控制源码解析
前言 🔗
Promise
的并发控制源码解析,一看就是老面试题了
正文 🔗
一般我们讲到 Promise
的并发控制,很多时候我们都是和 HTTP
请求结合起来的
因为浏览器是有连接数量限制的,过多的 HTTP
请求会先挂起,等待先前发送的请求完毕之后再发送挂起的请求
每个请求都需要占用额外的内存,过多的 HTTP
请求挂起还可能导致浏览器崩溃
所以,合理的发送速度就显得格外的重要
这次,我们来品一品 async-pool 这个库的源代码,它的源代码,非常的简洁,如下
async function* asyncPool(concurrency, iterable, iteratorFn) {
const executing = new Set();
// 消费其中一个 Promise ,并从 executing 中删除该完成的 Promise
async function consume() {
const [promise, value] = await Promise.race(executing);
executing.delete(promise);
return value;
}
for (const item of iterable) {
const promise = (async () => await iteratorFn(item, iterable))().then(
value => [promise, value]
);
executing.add(promise);
if (executing.size >= concurrency) {
yield await consume();
}
}
while (executing.size) {
yield await consume();
}
}
module.exports = asyncPool;
这里面核心的地方有三个,其一个是 Promise.race
这个 api
其二是 for
循环中的根据 executing.size >= concurrency
判断来 consume
掉已完成的 Promise
其三是如果“池子中” Promise
没有达到上限,则需要依次 consume
,此时和限流就无关了
Promise.race
下,如果某个 Promise
已完成或失败,那么返回的 Promise
就会对应的完成或者失败
对应到代理里面的 consume
函数,即每次我们都从 executing
中取一个已完成 Promise
,然后把该 Promise
从 executing
中删除,这样下一次调用 consume
就会继续
在 for
循环中,我们可以发现对传入的 Promise
, 对其返回的结果也进行了包装
for (const item of iterable) {
const promise = (async () => await iteratorFn(item, iterable))().then(
value => [promise, value]
);
// ...
}
这里是为了记住当前的 Promise
对象,这样在 consume
函数中,我们就能删除这个把这个已完成的 Promise
从 executing
中删除
对于最后的 while
循环,当“池子”中的 Promise
数量已经小于我们指定的数量时,那么可以明确 for
内部的 yield await consume()
就不会执行到
但是剩下的 Promise
我们又不能不去 consume
进而获取结果(因为这个 asyncPool
这个函数是要获取结果返回的)
所以最后我们需要把“池子”里的 Promise
都 consume
掉,这样传入的所有 Promise
都能正确的被返回
当然,这里使用的是比较新的语法,比如 async/await
关键字,以及生成器函数
回到使用上,官方给了我们一个小例子,如下
可以发现,它的传参是根据参数不同来生成不同的 Promise
的,但实际上,生成的 Promise
并不是这么简单的
更多的时候,我们需要更加灵活的参数,比如下面的调用
myAsyncPool(
// 指定最大并发个数
3,
// 每个异步的工厂函数
[
() => axios.get("/abc"),
() => axios.delete("/a")
]
)
为了达成上面的效果,我们可以对源代码进行简单地修改,修改后如下
这里用了点 ts
泛型,整体大框架和源码保持不变
export async function* asyncPool<T>(
concurrency: number,
iterable: Iterable<() => Promise<T>>
) {
type WrapperPromise = Promise<[WrapperPromise, T]>;
const executing = new Set<WrapperPromise>();
async function consume() {
const [promise, value] = await Promise.race(executing);
executing.delete(promise);
return value;
}
for (const item of iterable) {
const promise = item().then((val) => [promise, val]) as WrapperPromise;
executing.add(promise);
if (executing.size >= concurrency) {
yield await consume();
}
}
while (executing.size) {
yield await consume();
}
}
看起来很不错,不过这还是不够灵活,比如我们无法一次性的传入所有的异步工厂函数
即我们需要分离创建“池子”和往“池子”放异步函数这两个操作,所以我们可以按照核心逻辑写出下面的代码
export class AsyncPool<T> {
private concurrency: number;
private running: Set<Promise<T>>;
private waiting: Array<() => Promise<T>>;
constructor(concurrency: number, iterable: Iterable<() => Promise<T>>) {
this.concurrency = concurrency;
this.running = new Set();
this.waiting = [];
for (const item of iterable) {
this.add(item);
}
}
add(promise: () => Promise<T>) {
if (this.running.size < this.concurrency) {
const p = promise().then((val) => {
this.running.delete(p);
while (
this.running.size < this.concurrency &&
this.waiting.length > 0
) {
const waitPromiseFactory = this.waiting.shift()!;
this.add(waitPromiseFactory);
}
return val;
});
this.running.add(p);
} else {
this.waiting.push(promise);
}
}
}
这里我们使用两个池子来表示,一个是正在执行的“池子”,一个是等待中的“池子”
每次我们添加一个 Promise
时,如果此时正在执行的“池子”还有位置,那么直接放入执行即可
如果此时正在执行的池子没位置了,我们就先放入等待池子中
当正在执行的“池子”中某一个 Promise
执行完毕之后,我们需要删除执行“池子”中对应的 Promise
然后我们需要判断,如果此时等待“池子”中有未处理的 Promise
,那么就可以加入执行“池子”中开始执行
和上面版本对比的话,缺点就是没法使用 for await
了,适合零散的 Promise
的并发控制