android - 使用 Flowables.combineLatest 时出现 MissingBackpressureException
问题描述
我有这些代码:
Flowables.combineLatest(
flowableWithLatestBackpressureStrategyA,
flowableWithLatestBackpressureStrategyB
) { a, b -> /** transform result **/ }
.observeOn(AndroidSchedulers.mainThread())
.doOnError { /** do something **/ }
.subscribe(someBehaviorProcessor, Timber::e)
这会定期导致 MissingBackpressureException 错误消息“由于缺少请求而无法交付价值”。完整的堆栈跟踪:
io.reactivex.exceptions.MissingBackpressureException: Could not deliver value due to lack of requests
at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.test(BehaviorProcessor.java:647)
at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.emitNext(BehaviorProcessor.java:620)
at io.reactivex.processors.BehaviorProcessor.onNext(BehaviorProcessor.java:281)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.drain(FlowableOnBackpressureLatest.java:129)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.onNext(FlowableOnBackpressureLatest.java:68)
at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.drainAsync(FlowableCombineLatest.java:374)
at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.drain(FlowableCombineLatest.java:406)
at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.innerValue(FlowableCombineLatest.java:250)
at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestInnerSubscriber.onNext(FlowableCombineLatest.java:521)
at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.test(BehaviorProcessor.java:640)
at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.emitNext(BehaviorProcessor.java:620)
at io.reactivex.processors.BehaviorProcessor.onNext(BehaviorProcessor.java:281)
at ***.***.messaging.modules.home.HomePresenter$onBindView$7.accept(HomePresenter.kt:104)
at ***.***.messaging.modules.home.HomePresenter$onBindView$7.accept(HomePresenter.kt:22)
at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.drain(FlowableOnBackpressureLatest.java:129)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.onNext(FlowableOnBackpressureLatest.java:68)
at io.reactivex.internal.operators.flowable.FlowableFromObservable$SubscriberObserver.onNext(FlowableFromObservable.java:54)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62)
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52)
at io.reactivex.internal.util.NotificationLite.accept(NotificationLite.java:246)
at io.reactivex.subjects.BehaviorSubject$BehaviorDisposable.test(BehaviorSubject.java:569)
at io.reactivex.subjects.BehaviorSubject$BehaviorDisposable.emitNext(BehaviorSubject.java:564)
at io.reactivex.subjects.BehaviorSubject.onNext(BehaviorSubject.java:268)
at ***.***.core.EventBus.post(EventBus.kt:25)
at ***.***.messaging.receivers.NotificationReceiver.onReceive(NotificationReceiver.kt:31)
at android.app.ActivityThread.handleReceiver(ActivityThread.java:3040)
at android.app.ActivityThread.-wrap18(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1561)
at android.os.Handler.dispatchMessage(Handler.java:102)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6119)
at java.lang.reflect.Method.invoke(Method.java)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
我最初认为问题在于生成的 flowable 本身没有背压策略,所以我尝试添加一个;但是,错误仍然存在。
lowables.combineLatest(
flowableWithLatestBackpressureStrategyA,
flowableWithLatestBackpressureStrategyB
) { a, b -> /** transform result **/ }
.observeOn(AndroidSchedulers.mainThread())
.doOnError { /** do something **/ }
.subscribe(someBehaviorProcessor, Timber::e)
该消息是什么意思,我该怎么做才能避免出现异常?
解决方案
推荐阅读
- javascript - 如何在一个文件中加入两个以上分离的 JS 库
- apache - SSL 在 xampp 中引发错误,即使在重新安装 Apache 时也是如此
- c++ - 使 Qmake 项目中的源代码依赖于协议缓冲区
- java - 带有 Oracle com.zaxxer.hikari.pool.HikariProxyCallableStatement 的 HikariCP 无法转换为 oracle.jdbc.OracleCallableStatement
- c# - 使用特定于环境的 appsettings.json 文件时,是否有办法按设置替换而不是添加?
- python - 如何从网页中的表格中抓取所有元素?
- opengl - 链接失败 - SDL(ld:未找到框架 SDL)
- unix - 如何实时读取目录中创建的 Unix 日志?
- google-app-maker - 显示上传到应用资源的图像
- python-3.x - 为什么不能导入子模块,而可以导入模块?