首页 > 解决方案 > 使用 Flowables.combineLatest 时出现 MissingBackpressureException

问题描述

我有这些代码:

Flowables.combineLatest(
      flowableWithLatestBackpressureStrategyA,
      flowableWithLatestBackpressureStrategyB
) { a, b -> /** transform result **/ }
      .observeOn(AndroidSchedulers.mainThread())
      .doOnError { /** do something **/ }
      .subscribe(someBehaviorProcessor, Timber::e)

这会定期导致 MissingBackpressureException 错误消息“由于缺少请求而无法交付价值”。完整的堆栈跟踪:

io.reactivex.exceptions.MissingBackpressureException: Could not deliver value due to lack of requests
    at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.test(BehaviorProcessor.java:647)
    at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.emitNext(BehaviorProcessor.java:620)
    at io.reactivex.processors.BehaviorProcessor.onNext(BehaviorProcessor.java:281)
    at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.drain(FlowableOnBackpressureLatest.java:129)
    at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.onNext(FlowableOnBackpressureLatest.java:68)
    at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.drainAsync(FlowableCombineLatest.java:374)
    at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.drain(FlowableCombineLatest.java:406)
    at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.innerValue(FlowableCombineLatest.java:250)
    at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestInnerSubscriber.onNext(FlowableCombineLatest.java:521)
    at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.test(BehaviorProcessor.java:640)
    at io.reactivex.processors.BehaviorProcessor$BehaviorSubscription.emitNext(BehaviorProcessor.java:620)
    at io.reactivex.processors.BehaviorProcessor.onNext(BehaviorProcessor.java:281)
    at ***.***.messaging.modules.home.HomePresenter$onBindView$7.accept(HomePresenter.kt:104)
    at ***.***.messaging.modules.home.HomePresenter$onBindView$7.accept(HomePresenter.kt:22)
    at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65)
    at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.drain(FlowableOnBackpressureLatest.java:129)
    at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.onNext(FlowableOnBackpressureLatest.java:68)
    at io.reactivex.internal.operators.flowable.FlowableFromObservable$SubscriberObserver.onNext(FlowableFromObservable.java:54)
    at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62)
    at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52)
    at io.reactivex.internal.util.NotificationLite.accept(NotificationLite.java:246)
    at io.reactivex.subjects.BehaviorSubject$BehaviorDisposable.test(BehaviorSubject.java:569)
    at io.reactivex.subjects.BehaviorSubject$BehaviorDisposable.emitNext(BehaviorSubject.java:564)
    at io.reactivex.subjects.BehaviorSubject.onNext(BehaviorSubject.java:268)
    at ***.***.core.EventBus.post(EventBus.kt:25)
    at ***.***.messaging.receivers.NotificationReceiver.onReceive(NotificationReceiver.kt:31)
    at android.app.ActivityThread.handleReceiver(ActivityThread.java:3040)
    at android.app.ActivityThread.-wrap18(ActivityThread.java)
    at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1561)
    at android.os.Handler.dispatchMessage(Handler.java:102)
    at android.os.Looper.loop(Looper.java:154)
    at android.app.ActivityThread.main(ActivityThread.java:6119)
    at java.lang.reflect.Method.invoke(Method.java)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)

我最初认为问题在于生成的 flowable 本身没有背压策略,所以我尝试添加一个;但是,错误仍然存​​在。

lowables.combineLatest(
      flowableWithLatestBackpressureStrategyA,
      flowableWithLatestBackpressureStrategyB
) { a, b -> /** transform result **/ }
      .observeOn(AndroidSchedulers.mainThread())
      .doOnError { /** do something **/ }
      .subscribe(someBehaviorProcessor, Timber::e)

该消息是什么意思,我该怎么做才能避免出现异常?

标签: androidkotlinrx-javarx-java2

解决方案


推荐阅读