java - RxJava2 - 在平行轨道上发出 onError 将得到 UndeliverableException
问题描述
尽管其中一个 rails 是发出的onError()
,但isCancelled()
在另一个 rails 中仍然返回 false,这会导致UndeliverableException
. 如何检查下游是否取消平行轨道?
Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
System.out.println("Flowable.create-emitter.isCancelled:" + emitter.isCancelled());
for (int i = 1; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).parallel(6).runOn(Schedulers.io())
.flatMap(new Function<Integer, Publisher<String>>() {
@Override
public Publisher<String> apply(Integer t) throws Exception {
// TODO Auto-generated method stub
return Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
System.out.println("flatMap-before onError-isCancelled:" + emitter.isCancelled());
try {
if (true) { // assume trigger the error
throw new Exception("Test");
}
if (!emitter.isCancelled()) {
emitter.onNext(String.valueOf((t + 1)));
emitter.onComplete();
}
} catch (Exception ex) {
if (!emitter.isCancelled()) {
emitter.onError(ex);
}
}
System.out.println("flatMap-after onError-isCancelled:" + emitter.isCancelled());
}
}, BackpressureStrategy.BUFFER);
}
}).sequential().subscribeOn(scheduler).observeOn(Schedulers.single())
.subscribeWith(new ResourceSubscriber<String>() {
public void onComplete() {
System.out.println("onComplete");
}
public void onError(Throwable arg0) {
System.out.println("onError:" + arg0.toString());
}
public void onNext(String arg0) {
System.out.println("onNext:" + arg0);
}
});
解决方案
我找到了解决方案。我需要添加一个全局错误消费者来解决问题。 https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling
推荐阅读
- android - 可过滤的 RecyclerView 期望变量初始化
- python - 如何在分组后合并列并选择熊猫数据框中其他列的第一个有效值?
- javascript - 按下回车键时移动 + 提交输入
- android - Android:错误找不到符号 NotificationCompat.DecoratedMediaCustomViewStyle
- angular - 使用 NgRx 将对象数据集成到组件中,具有本地副本
- android - 在一个 Firebase 项目中设置多个平台
- windows - 如何将新的文本字符串插入现有文本文件中的特定位置
- android - Visual Studio Code - 找不到 Android SDK
- node-red - 如何直接在 UI 上更改节点红色仪表板图的 Y 轴最小值和最大值?
- c++ - 尝试在 C++ 中重新加载运算符,但它似乎不起作用