project-reactor - 将 Flux 拆分为 Mono 头和 Flux 尾
问题描述
我想将 aFlux
分成两部分:AMono
代表第一个元素(头部),aFlux
代表其他所有元素(尾部)。在此过程中,不应重新订阅
基础。Flux
不起作用的示例:
final Flux<Integer> baseFlux = Flux.range(0, 3).log();
final Mono<Integer> head = baseFlux.next();
final Flux<Integer> tail = baseFlux.skip(1L);
assertThat(head.block()).isEqualTo(0);
assertThat(tail.collectList().block()).isEqualTo(Arrays.asList(1, 2));
日志如下所示,如您所见,基本 Flux 将被重新订阅两次:
[main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | cancel()
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO reactor.Flux.Range.1 - | request(1)
我的实际情况是,我的基础Flux
包含 CSV 文件的行,第一行是文件的标题,这是解析所有后续行所必需的。基础Flux
只能订阅一次,因为它基于InputStream
我为此找到的唯一相关资源是这个问题,但我发现这有点不适合我的需求。
解决方案
感谢评论中提供的建议,我能够设计以下解决方案:
final Flux<Integer> baseFlux = Flux.range(0, 3).log();
final Flux<? extends Tuple2<? extends Integer, Integer>> zipped = baseFlux
.switchOnFirst((signal, flux) -> (signal.hasValue()
? Flux.zip(Flux.just(signal.get()).repeat(), flux.skip(1L))
: Flux.empty()));
final List<? extends Tuple2<? extends Integer, Integer>> list = zipped.collectList().block();
assertThat(list.stream().map(Tuple2::getT1)).isEqualTo(Arrays.asList(0, 0));
assertThat(list.stream().map(Tuple2::getT2)).isEqualTo(Arrays.asList(1, 2));
它Flux
通过与原始通量的尾部重复压缩该元素来转换第一个元素之后的基础。它只订阅baseFlux
一次。
我不确定这是最好的解决方案,因为它会创建很多Tuple2
最终将被 GC 处理的对象,与基于 的有状态(“热”)通量的解决方案相比baseFlux
,它保留了原始订阅活着。
推荐阅读
- nuxt.js - 如何忽略 Nuxt.js 起始问题
- javascript - 使用 Framer Motion 让一个元素淡入另一个元素
- c# - ComboxBox 提示新表单
- python - re.search 错误:TypeError:预期的字符串或类似字节的对象
- python - 在 python 线程工作者中访问全局变量
- haskell - 如何在 Haskell 中将值列表显示为逗号分隔的字符串列表
- android - ASCII 艺术在 Android 设备上的呈现方式不同?
- firebase - 超出最大调用堆栈大小(在 Nuxt + Firebase 项目中)
- python-3.x - 如何从单引号Python中剥离字符串
- c++ - 在函数中使用按值调用和按引用传递参数的区别