首页 > 解决方案 > 使用间隔时的背压

问题描述

我有这个 RxJava2 可流动的

private val pulseFlowable = Flowable
   .interval(200, TimeUnit.MILLISECONDS)
   .subscribeOn(Schedulers.computation())

和这个订阅者

pulseFlowable
   .observeOn(Schedulers.computation())
   .subscribe(
      object : Subscriber<Long> {
         override fun onSubscribe(subscription: Subscription) {
            pulseSubscription = subscription
         }

         override fun onNext(long: Long?) {
            onPulse()
         }

         override fun onError(throwable: Throwable) {
            Timber.e("onError")
         }

         override fun onComplete() {
            Timber.d("onComplete")
         }
      }
   )

它产生一个MissingBackPressureException,我“有点”理解:由于我在同一个调度程序上订阅和观察,我最初认为可能不会发生背压,但也许这是一个误解。

当我将流动性更改为

private val pulseFlowable = Flowable
   .interval(200, TimeUnit.MILLISECONDS)
   .onBackpressureLatest()
   .subscribeOn(Schedulers.computation())

我不再得到异常,但也没有调用 onNext。这是为什么?

标签: rx-java2backpressure

解决方案


推荐阅读