java-8 - 如何在所有项目被消耗之前终止 Observables 和 ovserver 之间的连接
问题描述
我想知道如何在所有 obsevabes 被消耗之前尽早终止 observables 和观察者之间的连接。我知道它可以通过一次性完成..但是我怎样才能参考下面发布的示例给出的一次性对象提前谢谢
代码:
Observable<List<List<Person>>> observables = Observable.just(Main.getPersons());
observables
.concatMap(ll->{
//how to display the size of List<List<Person>>
//System.out.println("ll: " + ll.size());
return Observable.fromIterable(ll)
.concatMap(l->Observable.fromIterable(l))
.filter(p->p.getAge().orElse(-1) <44)
.map(g->g.getName().map(s->s+"_test").get()+ " " + g.getAge().orElse(0));
}
)
//.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.blockingSubscribe(new Observer() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
System.out.println("onError: " + arg0);
}
@Override
public void onNext(Object arg0) {
// TODO Auto-generated method stub
System.out.println("onNext: " + arg0);
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
}
});
解决方案
推荐阅读
- winapi - 为什么 fopen 在 CreateFile 失败的地方成功?
- angular - Angular中的动画元素位置交换
- javascript - 将 SIP.js 与 React Native 集成
- routing - 有没有办法创建一个仅包含路由及其在 Rocket 中的功能的中央路由文件?
- java - 在 oracle 瘦驱动支持的 tnsnames.ora 中声明多个相同的服务
- django - django restframework token 导致 ImportError
- angularjs - 不加载 karma-jasmine 框架中生成的测试用例
- sql-server - 在基础查询中使用 REPLACE 打开 TClientDataSet 时内存不足
- dart - SlickDart 组件使用 AngularDart 错误
- json - 序列化外部节点模块的 JSON 返回到 KotlinJS 中的 Kotlin 类