首页 > 解决方案 > 如何在所有项目被消耗之前终止 Observables 和 ovserver 之间的连接

问题描述

我想知道如何在所有 obsevabes 被消耗之前尽早终止 observables 和观察者之间的连接。我知道它可以通过一次性完成..但是我怎样才能参考下面发布的示例给出的一次性对象提前谢谢

代码

Observable<List<List<Person>>> observables = Observable.just(Main.getPersons());
    observables
    .concatMap(ll->{
        //how to display the size of List<List<Person>>
        //System.out.println("ll: " + ll.size());
        return Observable.fromIterable(ll)
                .concatMap(l->Observable.fromIterable(l))
                .filter(p->p.getAge().orElse(-1) <44)
                .map(g->g.getName().map(s->s+"_test").get()+ "  " + g.getAge().orElse(0));
    }
            )
    //.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .blockingSubscribe(new Observer() {

        @Override
        public void onComplete() {
            // TODO Auto-generated method stub

        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub
            System.out.println("onError: " + arg0);
        }

        @Override
        public void onNext(Object arg0) {
            // TODO Auto-generated method stub
            System.out.println("onNext: " + arg0);
        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub

        }
    });

标签: java-8rx-javaobservablerx-java2

解决方案


推荐阅读