created at 2025/10/11 01:13:53
updated at 2025/10/11 01:13:53
TypeScript
/**
* 表示一个任务执行后的结果,与 Promise.allSettled 的返回值结构一致。
*/
type SettledResult<T> =
| { status: "fulfilled"; value: T }
| { status: "rejected"; reason: unknown };
/**
* 创建一个并发限制的函数。
* @param limit - 最大并发数。
* @returns 一个新的函数,该函数接受一个任务数组并返回一个 Promise,该 Promise resolve 为所有任务的结果。
*/
export function pLimit<T, R>(
limit: number,
): (tasks: T[], executor: (task: T) => Promise<R>) => Promise<SettledResult<R>[]> {
// 参数校验
if (limit < 1) {
throw new TypeError("Concurrency limit must be a positive number.");
}
return (tasks: T[], executor: (task: T) => Promise<R>): Promise<SettledResult<R>[]> => {
// 如果没有任务,直接返回一个空数组
if (tasks.length === 0) {
return Promise.resolve([]);
}
const results: SettledResult<R>[] = new Array(tasks.length);
let activeCount = 0; // 当前正在执行的任务数
let taskIndex = 0; // 下一个要执行的任务索引
return new Promise((resolve) => {
/**
* 尝试执行下一个任务
*/
const run = async () => {
// 如果所有任务都已分配,则返回
if (taskIndex >= tasks.length) {
return;
}
// 获取当前任务并更新索引
const currentIndex = taskIndex++;
const task = tasks[currentIndex];
activeCount++;
try {
// 执行任务
const result = await executor(task);
// 将成功结果存入数组
results[currentIndex] = { status: "fulfilled", value: result };
} catch (error) {
// 将失败原因存入数组
results[currentIndex] = { status: "rejected", reason: error };
} finally {
// 任务完成,活跃任务数减一
activeCount--;
// 尝试启动下一个任务
next();
}
};
/**
* 检查是否可以启动下一个任务,或者是否所有任务都已完成
*/
const next = () => {
// 如果还有任务待执行,并且活跃任务数未达到上限
if (taskIndex < tasks.length && activeCount < limit) {
// 启动一个新任务
run();
}
// 如果所有任务都已分配,并且没有活跃任务了,说明全部完成
else if (activeCount === 0 && taskIndex >= tasks.length) {
// 解析最终的 Promise
resolve(results);
}
};
// 初始启动,最多启动 'limit' 个任务
for (let i = 0; i < limit && i < tasks.length; i++) {
run();
}
});
};
}