首页 > 解决方案 > RxJava 1.3 - amb 不订阅也不退订较慢的流

问题描述

    Observable<Object> obs1 = Observable
            .create(subscriber -> subscriber.onNext("obs 1 event"))
            .doOnSubscribe(() -> System.out.println("obs1 sub"))
            .doOnUnsubscribe(() -> System.out.println("obs1 unsub"));

    Observable<Object> obs2 = Observable
            .create(subscriber -> subscriber.onNext("obs 2 event"))
            .doOnSubscribe(() -> System.out.println("obs2 sub"))
            .doOnUnsubscribe(() -> System.out.println("obs2 unsub"));

    Observable
            .amb(obs1, obs2)
            .subscribe(System.out::println);

    Thread.sleep(500);

obs2 doOn* 方法应该被调用,并且应该只发出两个事件中的一个。程序输出:

obs1 sub
obs 1 event

没有调用 obs2 的 *subscribe 方法。

标签: rx-java

解决方案


默认情况下,RxJava 源代码和操作符是同步的,除非它们使用调度程序来引入异步。您上面的代码都没有涉及任何调度程序,因此执行将是同步的。amb不会尝试订阅第二个,因为当时第一个已经赢得了比赛。

当我用它替换 create 时,just它​​会像我预期的那样工作

产生不同结果的原因just是背压,您没有在不推荐create使用的情况下实施。amb先订阅源,然后再从源请求,因此您会得到订阅的副作用。由于您的实施中断,第一个来源会立即推送其项目,从而amb进入其胜利状态,从而防止发生第二次订阅。


推荐阅读