首页 > 解决方案 > 将 Flux 拆分为 Mono 头和 Flux 尾

问题描述

我想将 aFlux分成两部分:AMono代表第一个元素(头部),aFlux代表其他所有元素(尾部)。在此过程中,不应重新订阅
基础。Flux

不起作用的示例:

final Flux<Integer> baseFlux = Flux.range(0, 3).log();
final Mono<Integer> head = baseFlux.next();
final Flux<Integer> tail = baseFlux.skip(1L);
assertThat(head.block()).isEqualTo(0);
assertThat(tail.collectList().block()).isEqualTo(Arrays.asList(1, 2));

日志如下所示,如您所见,基本 Flux 将被重新订阅两次:

[main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | cancel()
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO reactor.Flux.Range.1 - | request(1)

我的实际情况是,我的基础Flux包含 CSV 文件的行,第一行是文件的标题,这是解析所有后续行所必需的。基础Flux只能订阅一次,因为它基于InputStream

我为此找到的唯一相关资源是这个问题,但我发现这有点不适合我的需求。

标签: project-reactorreactor

解决方案


感谢评论中提供的建议,我能够设计以下解决方案:

final Flux<Integer> baseFlux = Flux.range(0, 3).log();
final Flux<? extends Tuple2<? extends Integer, Integer>> zipped = baseFlux
    .switchOnFirst((signal, flux) -> (signal.hasValue()
        ? Flux.zip(Flux.just(signal.get()).repeat(), flux.skip(1L))
        : Flux.empty()));
final List<? extends Tuple2<? extends Integer, Integer>> list = zipped.collectList().block();
assertThat(list.stream().map(Tuple2::getT1)).isEqualTo(Arrays.asList(0, 0));
assertThat(list.stream().map(Tuple2::getT2)).isEqualTo(Arrays.asList(1, 2));

Flux通过与原始通量的尾部重复压缩该元素来转换第一个元素之后的基础。它只订阅baseFlux一次。

我不确定这是最好的解决方案,因为它会创建很多Tuple2最终将被 GC 处理的对象,与基于 的有状态(“热”)通量的解决方案相比baseFlux,它保留了原始订阅活着。


推荐阅读