首页 > 解决方案 > 顺序减少列表> 单声道, 如果有任何返回 false 则中止处理

问题描述

我有一个任务列表,这些任务是按顺序执行true的,如果任务成功或false失败则返回(非例外情况 - 第三种情况它遇到某种异常,这也应该中止处理,但我想将其视为不同的第三种情况)。

这些任务表示为Mono<Boolean>实例列表。它们可能有任意数量。从这个列表中,我想产生一个Mono<Boolean>具有以下特征的:

这是 3 个任务的示例列表 - 第一个成功,第二个失败 - 所以我不希望第三个运行:

Mono<Boolean> task1 = Mono.create(sink -> {
    try {
        System.out.println("Executing task 1...");

        // Simulate some work being done
        Thread.sleep(1000);

        System.out.println("Finished executing task 1.");

        // This task is successful, return true
        sink.success(true);
    } catch (InterruptedException e) {
        sink.error(e);
    }
});

Mono<Boolean> task2 = Mono.create(sink -> {
    try {
        System.out.println("Executing task 2...");

        // Simulate some work being done
        Thread.sleep(1000);

        System.out.println("Finished executing task 2.");

        // This task is NOT successful, return false
        sink.success(false);
    } catch (InterruptedException e) {
        sink.error(e);
    }
});

Mono<Boolean> task3 = Mono.create(sink -> {
    try {
        System.out.println("Executing task 3...");

        // Simulate some work being done
        Thread.sleep(1000);

        System.out.println("Finished executing task 3.");

        // This task is successful, return true
        sink.success(true);
    } catch (InterruptedException e) {
        sink.error(e);
    }
});

List<Mono<Boolean>> tasks = Arrays.asList(task1, task2, task3);

我的第一个想法是将Monos 列表转换为 a Flux,然后使用Flux#reduce

Mono<Boolean> process = Flux.concat(tasks)
    .reduce(true, (accum, value) -> accum && value);

process.subscribe(System.out::println);

这会正确生成false,这表明过程中的步骤之一未成功(步骤 2)。但是,根据输出,很明显任务 3 仍在执行,这是不正确的:

Executing task 1...
Finished executing task 1.
Executing task 2...
Finished executing task 2.
Executing task 3...
Finished executing task 3.
false

如果我回到同步世界并使用Mono#block,我可以获得所需的行为,但这感觉不是很惯用:

Mono<Boolean> process = Mono.create(sink -> {
    Boolean result = true;

    for (Mono<Boolean> task: tasks) {
        result = task.block();

        if (!result) {
            break;
        }
    }

    sink.success(result);
});

process.subscribe(System.out::println);

reduce使用, map,等内置运算符是否有更好的方法来做到这一点flatMap

标签: javaproject-reactor

解决方案


您可以takeUntil根据给定条件使用停止处理:

Mono<Boolean> process = Flux.concat(tasks)
    .takeUntil(x -> !x)
    .reduce(true, (accum, value) -> accum && value);

推荐阅读