java - 使用 groupBy 进行 Flux 并行串行执行
问题描述
说我有这个:
Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
.groupBy(i -> i % 3);
并说我有一个方法:
Mono<Integer> getFromService(Integer i);
我想getFromService
为每个组并行调用,但要确保每个组中的调用是串行的。
对于上面的示例,这将是三个具有这些输入值的并行流:
stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11
我试过这个,但它没有做我想要的:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))
这是一次为所有整数并行调用服务。我该如何进行?
解决方案
使用concatMap
或flatMapSequential
代替内部.flatMap
如果您希望在每个组内顺序执行(即每个组内一次只有一个订阅getFromService
),请使用.concatMap
,如下所示:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))
如果组内的并行执行没问题,但您只关心发出序列的顺序,则使用flatMapSequential
,如下所示:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))
另一种选择是使用设置为.flatMap
的concurrency
参数1
,但我建议使用上述方法之一。
推荐阅读
- appium - 如何在 android studio 中为 android 应用编写测试用例并在 Appium 服务器中生成结果?
- swift - 在 Swift 中是否有 self.view.isUserInteractionEnabled = false 的替代方案?
- c# - 在 Word 中将 shape/inlineshape 设置为装饰
- json - 如何从json中获取数据并显示在表格中
- java - Spring集成和用户界面
- json - 为什么我在构建项目时出现 Flutter gradle 错误?
- wcf - wcf 中的 RoutingStyle SoapServiceRoutingStyle.RequestElement
- php - 如何在yii2中计算gridview进行单独计算
- python - Anaconda 安装 pycuda
- javascript - React.js 的 setState 没有更新 UI?