project-reactor - GroupedFlux 中的 Reactor delayElements 延迟所有组中的元素
问题描述
我有一个用例,我想通过 PartitionKey 创建一堆 GroupedFlux,并在每个组延迟元素内创建 100 毫秒。但是,我希望多个组同时开始。因此,如果有 3 个组,我预计每 100 毫秒发出 3 条消息。但是,使用以下代码,我每 100 毫秒只看到 1 条消息。
这是我期望工作的代码。
final Flux<GroupedFlux<String, TData>> groupedFlux =
flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
.flatMap(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.subscribe());
这是日志。
21:24:29.318 parallel-5] : GroupByKey : 2
21:24:29.424 parallel-6] : GroupByKey : 3
21:24:29.529 parallel-7] : GroupByKey : 1
21:24:29.634 parallel-8] : GroupByKey : 2
21:24:29.739 parallel-9] : GroupByKey : 3
21:24:29.844 parallel-10] : GroupByKey : 1
21:24:29.953 parallel-11] : GroupByKey : 2
21:24:30.059 parallel-12] : GroupByKey : 3
21:24:30.167 parallel-1] : GroupByKey : 1
(查看每个日志语句之间几乎 100 毫秒的差异。1s 列是时间戳。
解决方案
经过更多分析,我发现它运行良好。我的测试有不正确的 PartitionKey 数据,导致单个GroupedFlux。
回答我自己的问题,以防有人怀疑 delayElements 在 groupedFlux 上的工作方式不同。它不是。
推荐阅读
- python - 摆脱 Qt Creator 中 python 文件中的错误和警告
- python - 为什么 SQLite (Python) 说“ORGANIZATION”表已经存在?
- c++ - 在函数中转换为模板类
- gitea - CreateOrAppendToCustomConf() [F] 未能创建 '/etc/gitea/app.ini': mkdir /etc/gitea: 权限被拒绝
- python - TypeError:源对象必须是表面
- android - Divider 颜色 DatePicker Android
- python - 单击“查看”按钮时出现此 Django 错误 MultiValueDictKeyError
- google-apps-script - 如何使用谷歌应用程序脚本中的另一个函数停止一个函数?
- stream - 使用 CheckedInputStream/CheckedOutputStream 压缩/解压缩时校验和不匹配
- r - 如何对 R 执行 Wald 检验以显示参数对总体的影响?