首页 > 解决方案 > 在 RxJava 中合并两个具有独立和相等超时的流的正确方法

问题描述

我想知道如何最好地组合两个流,每个流在 RxJava 中都有相同持续时间的独立超时,而不会导致崩溃错误。

在 RxJava 中,如果运行以下代码,会导致崩溃异常:

val testObs = Single.fromCallable {
    Thread.sleep(10000)
}.subscribeOn(Schedulers.io())
        .timeout(5000L, TimeUnit.MILLISECONDS)
        .timeout(5000L, TimeUnit.MILLISECONDS)
        .test()

testObs.awaitTerminalEvent()

这是因为它们都在线程timeouts上执行,线程是或可以是多线程的。其中一个超时将成功并关闭流,第二个将引发崩溃异常:computation

io.reactivex.exceptions.UndeliverableException:无法将异常传递给消费者,因为它已经取消/处置了流程,或者异常无处可去。

显然,一个明显的解决方案是不要将两个相同持续时间的超时附加到单个流。

然而,让我们想象一个简单的Api类,它定义了两个方法:

  fun getNewMessages: Single<List<Messsage>>
  fun getUserProfileInfo: Single<Profile>

在您的代码中,假设您有单独Activity的 s,每个 s 都独立地调用这些方法;您还有一个StartupActivity调用这两种方法的方法,但用于Single.zip合并这些操作中的每一个,因此它基本上可以预先获取数据并在调用finish()启动启动屏幕之前配置应用程序并为用户准备好。这些方法中的每一个的实现都使用了一个通用的网络类,该类应用了.timeout30 秒的标准值。

不幸的是,如果有一个网络问题导致这两种方法都停止超出它们的默认.timeout值,Single.zip那么结合这些网络操作的 a 将会崩溃。

因此,是否有推荐的模式来组合多个信号,每个信号都具有可能相等并因此同时触发的独立超时值?如果我们采用“除非您是最终消费者,否则不添加超时”的方法,最终将导致.timeout(30, TimeUnit.SECOND)整个代码中出现大量复制粘贴调用。

谢谢!

标签: androidtimeoutobservablerx-java

解决方案


UndeliverableException例外是包装:

java.util.concurrent.TimeoutException:源在 5000 毫秒内没有发出事件信号并且已终止

您对timeout运算符是正确的,默认情况下它在计算调度程序中执行。以相同的持续时间链接它们将导致不确定的结果。有时第一个会触发TimeoutException有时第二个。

您可以通过使用其中一种方法并发出组合对象的特殊实例来避免zip运算符崩溃。onErrorX前任 :

long CONFIGURED_TIMEOUT = 300L;

Single<String> getNewMessages = Single.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    } catch (Exception e) {
    }
    return "";
}).timeout(CONFIGURED_TIMEOUT, TimeUnit.MILLISECONDS);

Single<Long> getUserProfileInfo = Single.fromCallable(() -> {
    try {
        Thread.sleep(500);
    } catch (Exception e) {
    }
    return 1L;
}).timeout(CONFIGURED_TIMEOUT, TimeUnit.MILLISECONDS);


TestObserver<String> testObs = Single.zip(
        getNewMessages,
        getUserProfileInfo,
        (a, b) -> a + " --- " + b)
        .onErrorResumeNext(Single.just("Special item"))
        .test();

testObs
        .awaitCount(1)
        .assertNoErrors()
        .assertValue("Special item");

或者,您可以在收到错误时使用 aMaybe发出 anempty并进行一些副作用处理:

TestObserver<String> testObs = Maybe.zip(
        getNewMessages.toMaybe(),
        getUserProfileInfo.toMaybe(),
        (a, b) -> a + " --- " + b)
        .doOnError(th -> System.out.println("Do side effect error handling"))
        .onErrorResumeNext(Maybe.empty())
        .test();

小心订单,doOnError如果你把它放在onErrorResumeNext前面,将不会收到任何东西。


推荐阅读