rx-java - RxJava 1.3 - amb 不订阅也不退订较慢的流
问题描述
Observable<Object> obs1 = Observable
.create(subscriber -> subscriber.onNext("obs 1 event"))
.doOnSubscribe(() -> System.out.println("obs1 sub"))
.doOnUnsubscribe(() -> System.out.println("obs1 unsub"));
Observable<Object> obs2 = Observable
.create(subscriber -> subscriber.onNext("obs 2 event"))
.doOnSubscribe(() -> System.out.println("obs2 sub"))
.doOnUnsubscribe(() -> System.out.println("obs2 unsub"));
Observable
.amb(obs1, obs2)
.subscribe(System.out::println);
Thread.sleep(500);
obs2 doOn* 方法应该被调用,并且应该只发出两个事件中的一个。程序输出:
obs1 sub
obs 1 event
没有调用 obs2 的 *subscribe 方法。
解决方案
默认情况下,RxJava 源代码和操作符是同步的,除非它们使用调度程序来引入异步。您上面的代码都没有涉及任何调度程序,因此执行将是同步的。amb
不会尝试订阅第二个,因为当时第一个已经赢得了比赛。
当我用它替换 create 时,
just
它会像我预期的那样工作
产生不同结果的原因just
是背压,您没有在不推荐create
使用的情况下实施。amb
先订阅源,然后再从源请求,因此您会得到订阅的副作用。由于您的实施中断,第一个来源会立即推送其项目,从而amb
进入其胜利状态,从而防止发生第二次订阅。
推荐阅读
- c# - C# Gridview 数据源到数据表
- ios - 如何静态或动态链接每个 Cocoapod 库?
- reactjs - Firebase doc.data() 是一个对象而不是字符串
- javascript - Twilio.Device 不是构造函数
- python - 如何将 MLPC 从 SKLearn 转换为 CoreML
- javascript - 如何在 Header 部分中的 JQuery 与 Body 部分中的 Javascript 之间共享数据?
- tensorflow2.0 - Tensorflow损失函数读数发散?
- bootstrap-4 - Django CKEditor 不验证引导表单
- arrays - 将由所有进程传播的数组的不同部分放入单个最终数组中的更好方法,在 C 中使用 MPI
- javascript - ipcRenderer 数据未发送到 Electron 中的 ipcMain