首页 > 解决方案 > 什么是分组通量,我们如何使用它?

问题描述

我正在研究某个对象的通量,可以说Flux < MovieReservation >。这包含电影 ID、名称、时间、标题等信息。所以我想提取有助于创建新
Flux < MovieShowDetail >. 我的意图是按电影 ID 对所有预订进行分组,并将 Flux 分解为一组更小和多个 Flux(如果是这样的话,就是 Flux)。就像是

Flux {
   movie1 -> Flux<MovieShowDetail>
   movie2 -> Flux<MovieShowDetail>
   ... and so on
}

所以我遇到了这个 groupBy 方法,它应该只做这样的事情。然而,文档确实没有这方面的内容,尤其是关于如何迭代每部电影及其各自的 Flux。

此外,当我尝试通过尝试和错误来学习时,处理在 groupBy 方法之前的操作后停止。

我试过做

fluxOfSomething
.groupBy( movieReservation -> movieReservation.getMovieId ,
movieReservation -> movieReservation) 

这样我就可以遍历每个通量并创建 MovieShowDetail 的新通量。但是,处理永远不会进入此块。我尝试记录东西,但流程从未进入它。

flux
    .map( movieSomething -> do something)
    .groupBy( movieReservation -> 
        movieReservation.getMovieId , movieReservation ->
    movieReservation)
    .subscribe("This text doesn't get printed");

我真的需要尽可能多的信息。

标签: javaspringreactive-programmingspring-webfluxproject-reactor

解决方案


groupBy产生 a Flux<Flux<T>>(或更准确地说是 a Flux<GroupedFlux<T>>,它公开了每个组的键)。

AGroupedFlux和 a 一样Flux,必须订阅才能激活。所以你需要做的就是以某种方式消耗产生的Flux内在groupBy

一种典型的方法是使用flatMap,它已经进行了Function<T, Flux>转换。该函数可以很简单Function.identity()(但如果您想进一步处理内部的每个元素,Flux您可能应该从内部进行处理flatMap Function(因为组键在该 lambda 的范围内)。

movieReservations
    .groupBy(MovieReservation::movieId)
    .flatMap(idFlux -> idFlux
        .collectList()
        .map(listOfReservations ->
            new MovieInformation(idFlux.key(), listOfReservations)
        )
    );

推荐阅读