javascript - 在 RxJS 中以块的形式运行一组可观察对象
问题描述
我正在尝试以块的形式运行一个可观察对象池,并在两者之间添加一个间隔。我尝试了以下代码:
let i = 0;
from([].constructor(20)).pipe(
concatMap(a => of(i).pipe(delay(1000))), // add a delay
mergeMap(obj => of(i++), 5) // run 5 in parallel
).subscribe(res => {
console.log('done', new Date().toISOString(), res);
});
现在这增加了所有可观察的延迟,所以我得到的输出是:
done 2020-03-25T09:23:34.151Z 0
done 2020-03-25T09:23:35.151Z 1
done 2020-03-25T09:23:36.151Z 2
done 2020-03-25T09:23:37.151Z 3
done 2020-03-25T09:23:38.151Z 4
done 2020-03-25T09:23:39.151Z 5
done 2020-03-25T09:23:40.153Z 6
done 2020-03-25T09:23:41.155Z 7
done 2020-03-25T09:23:42.161Z 8
done 2020-03-25T09:23:43.163Z 9
done 2020-03-25T09:23:44.167Z 10
done 2020-03-25T09:23:45.170Z 11
done 2020-03-25T09:23:46.171Z 12
done 2020-03-25T09:23:47.177Z 13
done 2020-03-25T09:23:48.178Z 14
done 2020-03-25T09:23:49.182Z 15
done 2020-03-25T09:23:50.183Z 16
done 2020-03-25T09:23:51.186Z 17
done 2020-03-25T09:23:52.188Z 18
done 2020-03-25T09:23:53.192Z 19
正如你所看到的,它运行它们中的每一个并增加 1 秒的延迟。我想要实现的是先并行运行 5 次,然后添加 1 秒延迟,然后再运行下 5 次,依此类推。
我什至尝试在管道中交换 mergeMap 和 concatMap 顺序,但结果相同。
关于如何做到这一点的任何想法?
解决方案
您可以使用bufferCount
创建批次然后forkJoin
并行运行它们:
let i = 0;
const createRequest = () => of(i++);
from([].constructor(20)).pipe(
bufferCount(5),
concatMap(chunk => forkJoin(chunk.map(createRequest))
.pipe(delay(1000)) // add a delay
),
).subscribe(res => {
console.log('done', new Date().toISOString(), res);
});
推荐阅读
- python - 如何计算(第 1 行和第 2 行)和(第 3 行和第 4 行)之间的时间增量等等?
- c# - MongoDB BatchSize FindOptions 在 C# netcore 后端无法正常工作
- php - PrestaShop 交叉销售产品按销售排序 [PHP MOD]
- javascript - 使用 if 和 else 以及 charAt 循环数组
- c++ - 如何使用 CMake 启用 gdb -g3 调试级别?
- python - 为什么在第 12 行的深度优先搜索中列表(路径)与路径的输出存在差异?
- swift - 如何在 Swift 上使用带有大量 if 和 else 的 Clean 代码
- algorithm - 这是什么类型的问题?帮助分类
- global-variables - 基于文本的游戏中的 C++ 静态变量
- python - 如何在图像上添加肤色表情符号(黑色竖起大拇指,白色竖起大拇指)?