首页 > 解决方案 > 出错时继续执行链式 RxJava observable,仅在满足特定条件时停止执行

问题描述

如何继续执行 Retrofit 网络调用 observable ,该调用作为从另一个 observable 获得的输入作为输入,flatMapIterable即使遇到错误也会转换为,并且仅在遇到特定 HTTP 状态代码时才停止迭代?

我有一个JSON保存在共享首选项中的请求列表,我需要使用 Retrofit 一个一个地发送,只有在我得到某个 HTTP 状态代码时才会停止。如果我得到其他异常,我只需要继续发送请求列表中的下一个项目。每当请求收到成功的响应时,我都会从我的请求列表中删除该特定请求。如果其他请求遇到错误,它们不会从列表中删除,我会再次将它们保存到共享首选项中。

到目前为止,我在一个ViewModel对象内执行此操作。首先,我通过paramRepository.getSavedOfflineRequest()返回 RxJava 的方法 ()获取这些请求Observable<List<Request>>。我想遍历所有请求,以便我可以将项目作为输入发送到apiService.sale,这是我的 Retrofit 调用,所以我使用flatMapIterable. 如果请求成功,我删除请求并将Transaction对象保存到数据库。

public LiveData<UploadStatus> startUploading() {
    MutableLiveData<UploadStatus> uploadStatus = new MutableLiveData<>();
    compositeDisposable.add(paramRepository.getSavedOfflineRequest()
        .doOnComplete(() -> uploadStatus.setValue(UploadStatus.NO_ITEMS))
        .flatMapIterable( requests -> requests)
        .flatMapCompletable(request -> apiService.sale(saleUrl, BuildConfig.ApiKey,request)
            .doOnSuccess(response -> {
                requestList.remove(request);
                transactions.add(createTransaction(request, response));
            }).ignoreElement()
         )
        .andThen(saveUploadedToDb(transactions))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(() -> uploadStatus.setValue(UploadStatus.SUCCESS),
            error -> {
                Log.d(TAG, "error");
                if (error instanceof HttpException) {
                    HttpException httpException = (HttpException) error;
                    int statusCode = httpException.code();
                    if (statusCode == 401) {
                        Log.d(TAG, "logged out");
                        uploadStatus.setValue(UploadStatus.LOGGED_OUT);
                    }
                } else {
                    uploadStatus.setValue(UploadStatus.FAIL);
                }
            }));

    return uploadStatus;
}

我希望如果我遇到其他错误/异常,我会继续使用apiService.saleRequest一项进行调用。但是我注意到,当只遇到一个错误时,整个链就停止了,因此另一个Requests还没有发送。

我已经尝试过onErrorResumeNext,但它期望返回另一种类型的Exception,这与我想要的完全不同(对其他异常不做任何事情)。

标签: androidretrofit2rx-java2

解决方案


您可能希望 observable 返回 UploadStatus 并映射响应。

例如

    .map { response ->
        switch(response.code()) {
            case 200:
                return UploadStatus.SUCCESS;
            case 401:
                return UploadStatus.LOGGED_OUT;
            default:
                return UploadStatus.FAIL;
        }
     }

在出现异常的情况下,可以使用onErrorReturnUploadStatus.FAIL/ERROR 返回。这不会终止流。

    .onErrorReturn { UploadStatus.FAIL }

推荐阅读