首页 > 解决方案 > 当找到第一个结果时,RxJava 中断并行执行

问题描述

我正在运行一些并行延迟的服务。任务是在所有服务完成执行时不要等待。

    Observable.just(2, 3, 5)
                .map(delay -> serviceReturningSingleWithDelay(delay))
                .toList()
                .flatMap(list ->
                        Single.zip(list, output -> Arrays.stream(output)
                                .map(delay -> (Integer) delay)
                                .filter(delay -> delay == 3)
                                .findFirst()
                                .orElse(0)
                        ))
                .subscribe(System.out::println);
 private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
        return Single.just(delay)
                .delay(delay, TimeUnit.SECONDS)
                .doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n", delay, Thread.currentThread().getName()));
    }

现在我的输出是:

Delay 2: Thread : RxComputationThreadPool-1 
Delay 3: Thread : RxComputationThreadPool-2 
Delay 5: Thread : RxComputationThreadPool-3 
3

期望的结果是在RxComputationThreadPool-3线程完成执行之前获得过滤值 - 3 。我会感谢任何想法。

标签: javamultithreadingrx-javarx-java2

解决方案


如果您想并行运行它们并在收到值 3 时退出,则不需要使用zip. 而是使用takeWhile如下方式中断您的可观察对象:

Observable.just(2, 3, 5)
          .flatMapSingle(this::serviceReturningSingleWithDelay)
          .takeWhile(e -> e != 3)
          .subscribe(System.out::println);

如果你想要3使用价值takeUntil(e -> e == 3)而不是takeWhile(e -> e != 3)


推荐阅读