rx-java2 - RxJava2 connectableObservables - autoConnect(2) - 为什么不等待 2 个订阅者调用 connect?
问题描述
我创建了一个如下所示的 connectedObservable:
final List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);
Observable<Integer> observable = Observable.fromIterable(list);
这个 observable 发出 1, 2, 3
现在我以这种方式将其转换为可连接的Observable:
observable.publish().autoConnect(2);
因此,由于我将 2 传递给 autoConnect,因此我期待它在找到 2 个订阅者之前不会触发。但它只在找到的 1 个订阅者上执行。
让我告诉你我尝试了什么:
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.v("consumer1:", ""+integer);
}
});
我期望这不会在日志中打印任何内容。但相反,我得到以下信息:
consumer1:: 1
consumer1:: 2
consumer1:: 3
根据autoConnect(int numberOfSubscribers) 上的文档:
* @param numberOfSubscribers the number of subscribers to **await** before calling connect * on the ConnectableObservable. A non-positive value indicates * an immediate connection.
解决方案
在 RxJava 中,你不应该忽略操作符的返回值:
Observable<Integer> shared = observable.publish().autoConnect(2);
shared.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.v("consumer1:", ""+integer);
}
});
shared.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.v("consumer2:", ""+integer);
}
});
推荐阅读:https ://github.com/ReactiveX/RxJava#simple-background-computation
推荐阅读
- python - 为什么访问外部范围中定义的变量需要全局关键字?
- ansible - 当条件下跳过剧本任务
- python-3.x - WTForms 不验证输入
- c# - System.IndexOutOfRangeException C# 中的 SQL Server 查询
- javascript - 通过使用 LODASH 比较 2 个数组来寻找缺失的信息
- clang - 如何从 clang 分析器中获取爆炸图
- android - 如何防止启动新活动以完成任务?
- python - python selenium chrome驱动程序进程超过系统pid max,如何杀死它们?
- angular - Angular 在 URL 中设置标题而不是 id
- html - 为什么我的班级没有覆盖通用星号选择器?