首页 > 解决方案 > RxJava 返回错误为 onNext 并继续流

问题描述

所以我尝试使用onErrorReturn返回我想要的结果,但它会在之后完成流,我如何捕获错误返回为 Next 并仍然继续流?

使用下面的代码,当有错误时它不会到达,如果我把它翻转过来,如果有错误retryWhen它不会重新订阅retryWhen

fun process(): Observable<State> {
    return publishSubject
        .flatMap { intent ->
            actAsRepo(intent) // Might return error
                .map { State(data = it, error = null) }
        }
        .onErrorReturn { State(data = "", error = it) } // catch the error
        .retryWhen { errorObs -> 
            errorObs.flatMap {
                Observable.just(State.defaultState()) // continue subscribing
            }
        }
}

private fun actAsRepo(string: String): Observable<String> {
    if (string.contains('A')) {
        throw IllegalArgumentException("Contains A")
    } else {
        return Observable.just("Wrapped from repo: $string")
    }
}

订户将是

viewModel.process().subscribe(this::render)

标签: androidkotlinerror-handlingrx-javarx-java2

解决方案


onError 是一个终端操作符。如果发生 onError,它将在操作员之间传递。您可以使用 onError 操作符来捕获 onError 并提供回退。

在您的示例中,onError 发生在 flatMap 的内部流中。onError 将向下游传播到 onErrorReturn 操作符。如果你看一下实现,你会看到 onErrorReturn lambda 会被调用,结果会被推送到下游,onNext 跟在 onComplete 之后

    @Override
    public void onError(Throwable t) {
        T v;
        try {
            v = valueSupplier.apply(t);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            downstream.onError(new CompositeException(t, e));
            return;
        }

        if (v == null) {
            NullPointerException e = new NullPointerException("The supplied value is null");
            e.initCause(t);
            downstream.onError(e);
            return;
        }

        downstream.onNext(v); // <--------
        downstream.onComplete(); // <--------
    }

你的解决方案的结果是什么?

您的流完成是因为:#retryWhen JavaDoc

如果操作符的上游是异步的,则立即发出 onNext 后跟 onComplete 可能会导致序列立即完成。类似地,如果这个内部 {@code ObservableSource} 在上游活动时发出 {@code onError} 或 {@code onComplete} 信号,则序列会立即以相同的信号终止。

你应该做的:

将 onErrorReturn 放在 flatMap 中的 map opreator 后面。当inner-flatMap 流出现onErrors 时,使用此排序您的流将不会完成。

为什么是这样?

当外部(来源:publishSubject)和内部流(订阅)都完成时,flatMap 运算符完成。在这种情况下,外部流 (publishSubject) 发出 onNext,内部流将在通过 onNext 发送 { State(data = "", error = it) } 后完成。因此,流将保持打开状态。

interface ApiCall {
    fun call(s: String): Observable<String>
}

class ApiCallImpl : ApiCall {
    override fun call(s: String): Observable<String> {
        // important: warp call into observable, that the exception is caught and emitted as onError downstream
        return Observable.fromCallable {
            if (s.contains('A')) {
                throw IllegalArgumentException("Contains A")
            } else {
                s
            }
        }
    }
}

data class State(val data: String, val err: Throwable? = null)

apiCallImpl.call 将返回一个惰性可观察对象,这将在订阅时引发错误,而不是在可观察组装时。

// no need for retryWhen here, except you want to catch onComplete from the publishSubject, but once the publishSubject completes no re-subscription will help you, because the publish-subject is terminated and onNext invocations will not be accepted anymore (see implementation). 
fun process(): Observable<State> {
    return publishSubject
        .flatMap { intent ->
            apiCallImpl.call(intent) // Might return error
                .map { State(data = it, err = null) }
                .onErrorReturn { State("", err = it) }
        }
}

测试

lateinit var publishSubject: PublishSubject<String>
lateinit var apiCallImpl: ApiCallImpl

@Before
fun init() {
    publishSubject = PublishSubject.create()
    apiCallImpl = ApiCallImpl()
}

@Test
fun myTest() {
    val test = process().test()

    publishSubject.onNext("test")
    publishSubject.onNext("A")
    publishSubject.onNext("test2")

    test.assertNotComplete()
        .assertNoErrors()
        .assertValueCount(3)
        .assertValueAt(0) {
            assertThat(it).isEqualTo(State("test", null))
            true
        }
        .assertValueAt(1) {
            assertThat(it.data).isEmpty()
            assertThat(it.err).isExactlyInstanceOf(IllegalArgumentException::class.java)
            true
        }
        .assertValueAt(2) {
            assertThat(it).isEqualTo(State("test2", null))
            true
        }
}

选择

此替代方案的行为与第一个解决方案略有不同。flatMap-Operator 采用布尔值 (delayError),这将导致吞下 onError 消息,直到源完成。当源代码完成时,将发出错误。

您可以使用 delayError true,当异常没有用并且在出现时不得记录时

过程

fun process(): Observable<State> {
    return publishSubject
        .flatMap({ intent ->
            apiCallImpl.call(intent)
                .map { State(data = it, err = null) }
        }, true)
}

测试

只发出两个值。错误不会转换为备用值。

@Test
fun myTest() {
    val test = process().test()

    publishSubject.onNext("test")
    publishSubject.onNext("A")
    publishSubject.onNext("test2")

    test.assertNotComplete()
        .assertNoErrors()
        .assertValueAt(0) {
            assertThat(it).isEqualTo(State("test", null))
            true
        }
        .assertValueAt(1) {
            assertThat(it).isEqualTo(State("test2", null))
            true
        }
        .assertValueCount(2)
}

注意:我认为你想在这种情况下使用 switchMap,而不是 flatMap。


推荐阅读