java - 当找到第一个结果时,RxJava 中断并行执行
问题描述
我正在运行一些并行延迟的服务。任务是在所有服务完成执行时不要等待。
Observable.just(2, 3, 5)
.map(delay -> serviceReturningSingleWithDelay(delay))
.toList()
.flatMap(list ->
Single.zip(list, output -> Arrays.stream(output)
.map(delay -> (Integer) delay)
.filter(delay -> delay == 3)
.findFirst()
.orElse(0)
))
.subscribe(System.out::println);
private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n", delay, Thread.currentThread().getName()));
}
现在我的输出是:
Delay 2: Thread : RxComputationThreadPool-1
Delay 3: Thread : RxComputationThreadPool-2
Delay 5: Thread : RxComputationThreadPool-3
3
期望的结果是在RxComputationThreadPool-3线程完成执行之前获得过滤值 - 3 。我会感谢任何想法。
解决方案
如果您想并行运行它们并在收到值 3 时退出,则不需要使用zip
. 而是使用takeWhile
如下方式中断您的可观察对象:
Observable.just(2, 3, 5)
.flatMapSingle(this::serviceReturningSingleWithDelay)
.takeWhile(e -> e != 3)
.subscribe(System.out::println);
如果你想要3
使用价值takeUntil(e -> e == 3)
而不是takeWhile(e -> e != 3)
推荐阅读
- palantir-foundry - Foundry Transform 在重新分区、hive 分区和分桶的各种组合中输出多少文件?
- tree - 树问题以在每个级别找到给定数量的节点的直径
- java - 有没有办法使用自定义策略来映射 JDBI3 中的枚举类型?
- r - R:每次销售前最后一年的房屋销售量
- python - 使用 python 在 elasticsearch 中增加 max_result_window 时出错
- flutter - Flutter 布局(Columan 中的 6 个容器)
- html - 如何让基于 Blink 的浏览器让我在使用透视时滚动浏览整个内容?
- python-3.x - 窗口上的 Pyspark StandardScaler
- angular - `this` 值以角度变化
- python - 在模型表单中插入“隐藏的时间戳字段”?