首页 > 解决方案 > 使用 groupBy 进行 Flux 并行串行执行

问题描述

说我有这个:

Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
   .groupBy(i -> i % 3);

并说我有一个方法:

Mono<Integer> getFromService(Integer i);

我想getFromService为每个组并行调用,但要确保每个组中的调用是串行的。

对于上面的示例,这将是三个具有这些输入值的并行流:

stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11

我试过这个,但它没有做我想要的:

Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))

这是一次为所有整数并行调用服务。我该如何进行?

标签: javaproject-reactor

解决方案


使用concatMapflatMapSequential代替内部.flatMap

如果您希望在每个组内顺序执行(即每个组内一次只有一个订阅getFromService),请使用.concatMap,如下所示:

Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))

如果组内的并行执行问题,但您只关心发出序列的顺序,则使用flatMapSequential,如下所示:

Flux.range(0, 12)
    .groupBy(i -> i % 3)
    .flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))

另一种选择是使用设置为.flatMapconcurrency参数1,但我建议使用上述方法之一。


推荐阅读