首页 > 解决方案 > RxJava2 connectableObservables - autoConnect(2) - 为什么不等待 2 个订阅者调用 connect?

问题描述

我创建了一个如下所示的 connectedObservable:

final List<Integer> list = new ArrayList<Integer>();
        for(int j=1;j<=3;j++)
            list.add(j);

        Observable<Integer> observable = Observable.fromIterable(list);

这个 observable 发出 1, 2, 3

现在我以这种方式将其转换为可连接的Observable:

observable.publish().autoConnect(2);

因此,由于我将 2 传递给 autoConnect,因此我期待它在找到 2 个订阅者之前不会触发。但它只在找到的 1 个订阅者上执行。

让我告诉你我尝试了什么:

observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer)  {
                Log.v("consumer1:", ""+integer);

            }
        });

我期望这不会在日志中打印任何内容。但相反,我得到以下信息:

consumer1:: 1
consumer1:: 2
consumer1:: 3

根据autoConnect(int numberOfSubscribers) 上的文档:

* @param numberOfSubscribers the number of subscribers to **await** before calling connect
 *                            on the ConnectableObservable. A non-positive value indicates
 *                            an immediate connection.

标签: rx-java2

解决方案


在 RxJava 中,你不应该忽略操作符的返回值:

Observable<Integer> shared = observable.publish().autoConnect(2);

shared.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer)  {
            Log.v("consumer1:", ""+integer);
        }
    });

shared.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer)  {
            Log.v("consumer2:", ""+integer);
        }
    });

推荐阅读:https ://github.com/ReactiveX/RxJava#simple-background-computation


推荐阅读