首页 > 解决方案 > Flux.groupBy() 后跟 Flux#next 不会对元素进行分组

问题描述

我观察到一种行为Flux#groupBy,我不确定这是一个错误还是来自我对反应堆的误解。

我有一个Flux要按公共字段分组的元素,然后只返回每个组的一个元素。我尝试过使用Flux#next,但最终结果令人惊讶,因为我仍然得到了助焊剂的所有元素。

Bar bar1 = new Bar(UUID.randomUUID())
Bar bar2 = new Bar(UUID.randomUUID())

Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
  .groupBy(foo -> foo.bar.id)
  .doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
  .flatMap(Flux::next)
  .collectList()
  .block() # the final list here contains 4 elements

在上面的例子println中被调用了 4 次,其中 key 被重复了两次:

Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8

替换代码flatMap以使用Flux#collectList而不是Flux#next为我解决了这个问题:

Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
  .groupBy(foo -> foo.bar.id)
  .doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
  .flatMap(flux -> flux.collectList().map(list -> list.get(0)))
  .collectList()
  .block() # the final list here contains 2 elements

我希望这两个片段都能正常工作,但显然由于某种原因它们没有。这种行为的解释是什么?

谢谢!

标签: javaproject-reactor

解决方案


解释是next()取消了开放组,因此在下面的元素groupBy上再次看到 key1,但该键没有开放组。结果,它重新创建了一个组并将第二次出现的bar1放入第二组。

由于collectList没有取消组,因此 key1 和 key2 组都保持打开状态,并且groupBy能够将传入Foo的 s 分派到两个组中。


推荐阅读