首页 > 解决方案 > 如何避免 onError 停止 rxjava 中的异步流

问题描述

让我直奔问题。我有一个要进行的网络调用列表。我正在使用merge运算符RxJava来合并网络调用列表并按顺序执行它们,最后得到输出列表。现在我面临一个问题。如果任何网络调用在下一次网络调用之间失败,则不会进行并且执行停止。如何确保列表中的所有网络调用无论成功或失败都被执行?

Observable<String> op1 = service1.operation();
Observable<String> op2 = service2.operation();
Observable<String> op3 = service3.operation();

Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();

在这里,如果我观察到lst它应该返回一个Strings包含成功或失败结果的列表。

标签: androidrx-java

解决方案


异常是terminal eventrxjava 中的一个,它终止它发生的流(以及任何外部流,如果您不使用任何运算符来处理它)。但是,如果您将该流包装在内部流中并应用运算符onErrorReturnItem,则可以避免它。无论如何,内部流将被终止,但不是onError通过应用onErrorReturnItem运算符传递给外部流,而是将传递onNext给外部流,这样外部流就不会被终止:

public static Observable<String> serviceOperation(String param) {
    return Observable.just(param)
            .doOnNext(s -> {
                //simulate error when param is "value 2"
                if (s.equals("value 2")) {
                    throw new IllegalStateException("Something happened");
                }
            });
}

public static void main(String args[]) {
    //here we create a stream and by using concatMap creating another inner stream
    Observable<String> op1 = Observable.just(1)
            .concatMap(integer -> {
                return serviceOperation("value 1")
                        .onErrorReturnItem("Error");
            });

    Observable<String> op2 =Observable.just(1)
            .concatMap(integer -> {
                return serviceOperation("value 2")
                        .onErrorReturnItem("Error");
            });

    Observable<String> op3 = Observable.just(1)
            .concatMap(integer -> {
                return serviceOperation("value 3")
                        .onErrorReturnItem("Error");
            });

    Observable.merge(op1, op2, op3)
            .toList()
            .subscribe(strings -> strings.forEach(System.out::println), Throwable::printStackTrace);
}

这将打印:

value 1
Error
value 3

推荐阅读