java - 有条件地结合 Mono 和 Flux
问题描述
我需要结合两个反应式发布者的结果——Mono 和 Flux。zip
我试图用and函数来做join
,但我无法满足两个特定条件:
- 结果应该包含与 Flux 发出的一样多的元素,但对应的 Mono 源应该只调用一次(仅此条件可以用 实现
join
) - 当 Flux 为空时,链应该完成而不等待 Mono 元素
第一个条件的解决方案在将Mono 与 Flux 组合条目(粘贴在下面)中介绍。但是我无法在不阻塞链条的情况下达到第二个条件——我想避免这种情况。
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();
List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
return x + y;
}).collectList().block();
System.out.println(list);
解决方案
如果您想在 Flux 为空的情况下取消整个操作,您可以执行以下操作
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
//Uncomment below and comment out above statement for empty flux
//Flux<Integer> flux = Flux.empty();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();
//Throw exception if flux is empty
flux = flux.switchIfEmpty(Flux.error(IllegalStateException::new));
List<String> list = flux
.join(mono, s -> Flux.never() , s -> Flux.never(), (x, y) -> x + y)
//Catch exception and return nothing
.onErrorResume(s -> Flux.empty())
.collectList().block();
System.out.println(list);
如果您希望 Mono 完成但不想加入挂起,您可以执行以下操作
DirectProcessor<Integer> processor = DirectProcessor.create();
//Could omit sink, and use processor::onComplete in place of sink::complete
//But typically recommended as provides better thread safety
FluxSink<Integer> sink = processor.serialize().sink();
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
//Uncomment below and comment out above statement for empty flux
//Flux<Integer> flux = Flux.empty();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();
List<String> list = flux
.doOnComplete(sink::complete)
.join(mono, s -> processor , s -> processor, (x, y) -> x + y).collectList().block();
System.out.println(list);
推荐阅读
- java - 伽罗瓦域 GF(4) 中乘法的常数时间
- c# - 是否可以在添加实体框架 5 期间使用 EF 函数转换?
- kubernetes - 如何在 istio 虚拟服务中正确路由请求?
- python - 如何从 RichTextField 中删除标签并计算 RichTextField Django 中保存的单词?
- esp32 - ESP32 偶尔与多个 I2C 设备崩溃,堆问题?
- javascript - 如何在 cshtml 页面中使用 css @keyframes?
- android - 为什么我不能从 Android 向实时数据库发送数据?
- docker - 发布 FastAPI docker-container 时出现 422 Unprocessable Entity 错误,但在未 dockerized 时有效
- function - 有没有像 sqlite3“函数”这样的东西?
- r - 计算 R 中每一列的出现次数