首页 > 解决方案 > 等待两个 Observable 完成后再继续?

问题描述

我有一个加载页面,我想在其中执行两个网络请求(retrofit2-rxjava)以获取不相关的信息。在这两个请求完成之前,我不想进入下一页,即使其中一个或两个请求都失败了。

  1. 使用 zip 将请求捆绑在一起。有没有办法不被迫使用 BiFunction,而不必返回 null?

  2. 请求 A 和 B 有一个 .doOnNext 和 .doOnError。如果其中一个返回错误,zip observable 是否继续?zip 订阅者是否也返回错误?

  3. 这是最好的方法吗?

private Disposable retrieveBothThings() {
return Observable.zip(getThingA(), getThingB(),
                    (A, B) -> {
                        onAllCallsComplete();
                        return null;
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(o -> {}, Logger::e);
}
    
    

private Observable<...> getThingA() {
            return SessionManager.getInstance().getApi()
                    .getA()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(this::onACompleted)
                    .doOnError(this::onAFailed);
}

private Observable<...> getThingB() {
        return SessionManager.getInstance().getApi()
                .getB()
                .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread()).toObservable()
                .doOnNext(this::onBSuccess)
                .doOnError(this::onBFailure);
}
    
private void onBSuccess(...) {
    ...        
}
    
private void onBFailure(final Throwable throwable) {
    Logger.e(throwable);
}

private void onACompleted(...) {
    ...        
}
    
private void onAFailed(final Throwable throwable) {
    Logger.e(throwable);
}

标签: javaandroidobservableretrofit2rx-java2

解决方案


您可以使用运算符组合两个可观察对象merge()。您可以将错误转换为onComplete()使用onErrorResumeNext().

Completable.merge(
  observable1
    .doOnNext(this::onACompleted)
    .doOnError(this::onAFailed)
    .onErrorResumeNext( Completable.complete() )
    .toCompletable(),
  observable2
    .doOnNext(this::onBCompleted)
    .doOnError(this::onBFailed)
    .onErrorResumeNext( Completable.complete() ),
    .toCompletable() )
.subscribe( ignore -> {}, 
            error -> {},
            () -> { processCompletion(); } );

推荐阅读