android - 在 RxJava 中合并两个具有独立和相等超时的流的正确方法
问题描述
我想知道如何最好地组合两个流,每个流在 RxJava 中都有相同持续时间的独立超时,而不会导致崩溃错误。
在 RxJava 中,如果运行以下代码,会导致崩溃异常:
val testObs = Single.fromCallable {
Thread.sleep(10000)
}.subscribeOn(Schedulers.io())
.timeout(5000L, TimeUnit.MILLISECONDS)
.timeout(5000L, TimeUnit.MILLISECONDS)
.test()
testObs.awaitTerminalEvent()
这是因为它们都在线程timeouts
上执行,线程是或可以是多线程的。其中一个超时将成功并关闭流,第二个将引发崩溃异常:computation
io.reactivex.exceptions.UndeliverableException:无法将异常传递给消费者,因为它已经取消/处置了流程,或者异常无处可去。
显然,一个明显的解决方案是不要将两个相同持续时间的超时附加到单个流。
然而,让我们想象一个简单的Api
类,它定义了两个方法:
fun getNewMessages: Single<List<Messsage>>
fun getUserProfileInfo: Single<Profile>
在您的代码中,假设您有单独Activity
的 s,每个 s 都独立地调用这些方法;您还有一个StartupActivity
调用这两种方法的方法,但用于Single.zip
合并这些操作中的每一个,因此它基本上可以预先获取数据并在调用finish()
启动启动屏幕之前配置应用程序并为用户准备好。这些方法中的每一个的实现都使用了一个通用的网络类,该类应用了.timeout
30 秒的标准值。
不幸的是,如果有一个网络问题导致这两种方法都停止超出它们的默认.timeout
值,Single.zip
那么结合这些网络操作的 a 将会崩溃。
因此,是否有推荐的模式来组合多个信号,每个信号都具有可能相等并因此同时触发的独立超时值?如果我们采用“除非您是最终消费者,否则不添加超时”的方法,最终将导致.timeout(30, TimeUnit.SECOND)
整个代码中出现大量复制粘贴调用。
谢谢!
解决方案
UndeliverableException
例外是包装:
java.util.concurrent.TimeoutException:源在 5000 毫秒内没有发出事件信号并且已终止
您对timeout
运算符是正确的,默认情况下它在计算调度程序中执行。以相同的持续时间链接它们将导致不确定的结果。有时第一个会触发TimeoutException
有时第二个。
您可以通过使用其中一种方法并发出组合对象的特殊实例来避免zip
运算符崩溃。onErrorX
前任 :
long CONFIGURED_TIMEOUT = 300L;
Single<String> getNewMessages = Single.fromCallable(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return "";
}).timeout(CONFIGURED_TIMEOUT, TimeUnit.MILLISECONDS);
Single<Long> getUserProfileInfo = Single.fromCallable(() -> {
try {
Thread.sleep(500);
} catch (Exception e) {
}
return 1L;
}).timeout(CONFIGURED_TIMEOUT, TimeUnit.MILLISECONDS);
TestObserver<String> testObs = Single.zip(
getNewMessages,
getUserProfileInfo,
(a, b) -> a + " --- " + b)
.onErrorResumeNext(Single.just("Special item"))
.test();
testObs
.awaitCount(1)
.assertNoErrors()
.assertValue("Special item");
或者,您可以在收到错误时使用 aMaybe
发出 anempty
并进行一些副作用处理:
TestObserver<String> testObs = Maybe.zip(
getNewMessages.toMaybe(),
getUserProfileInfo.toMaybe(),
(a, b) -> a + " --- " + b)
.doOnError(th -> System.out.println("Do side effect error handling"))
.onErrorResumeNext(Maybe.empty())
.test();
小心订单,doOnError
如果你把它放在onErrorResumeNext
前面,将不会收到任何东西。
推荐阅读
- android - 如何更新 Rest Api URL?
- python - 如何通过共享一个轴制作多个散点图?
- python - 有没有一种很好的方法可以使它成为一个 for 循环?
- java - Java 流将对象 List 的每个属性设置为另一个 List
- gitlab-ci - 从 Gitlab CI 工作流部署到 Maven Central
- javascript - 将选中的 ng-repeat 值(复选框为真)作为数组参数发送到 javascript 函数中
- sql - 加快大型 Oracle 数据库中的 Oracle SQL Developer 查询
- c# - C#:从已知数量的构造函数中调用随机构造函数的聪明方法是什么?
- javascript - mongoose、mongodb 中无法引用方法`db.start Session()`
- c# - 如何防止递归复制文件和目录?