首页 > 解决方案 > 有条件地结合 Mono 和 Flux

问题描述

我需要结合两个反应式发布者的结果——Mono 和 Flux。zip我试图用and函数来做join,但我无法满足两个特定条件:

  1. 结果应该包含与 Flux 发出的一样多的元素,但对应的 Mono 源应该只调用一次(仅此条件可以用 实现join
  2. 当 Flux 为空时,链应该完成而不等待 Mono 元素

第一个条件的解决方案在将Mono 与 Flux 组合条目(粘贴在下面)中介绍。但是我无法在不阻塞链条的情况下达到第二个条件——我想避免这种情况。

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
        Mono.just(2).delayElement(Duration.ofMillis(500))).log();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();

List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
    return x + y;
}).collectList().block();

System.out.println(list);

标签: javaproject-reactor

解决方案


如果您想在 Flux 为空的情况下取消整个操作,您可以执行以下操作

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
            Mono.just(2).delayElement(Duration.ofMillis(500))).log();

//Uncomment below and comment out above statement for empty flux

//Flux<Integer> flux = Flux.empty();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();

//Throw exception if flux is empty
flux = flux.switchIfEmpty(Flux.error(IllegalStateException::new));

List<String> list = flux
        .join(mono, s -> Flux.never() , s -> Flux.never(), (x, y) -> x + y)
        //Catch exception and return nothing
        .onErrorResume(s -> Flux.empty())
        .collectList().block();

    System.out.println(list);

如果您希望 Mono 完成但不想加入挂起,您可以执行以下操作

DirectProcessor<Integer> processor = DirectProcessor.create();
//Could omit sink, and use processor::onComplete in place of sink::complete
//But typically recommended as provides better thread safety
FluxSink<Integer> sink = processor.serialize().sink();

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
    Mono.just(2).delayElement(Duration.ofMillis(500))).log();

//Uncomment below and comment out above statement for empty flux

//Flux<Integer> flux = Flux.empty();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();

List<String> list = flux
     .doOnComplete(sink::complete)
     .join(mono, s -> processor , s -> processor, (x, y) -> x + y).collectList().block();

System.out.println(list);

推荐阅读