首页 > 解决方案 > Rx 运算符。忽略直到发出下一个

问题描述

在我的应用程序中,我有可以通过多种方式启动的耗时逻辑,比如说自动或由用户手动启动。

// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()

// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }

val startsDisposable = Observable
        .merge(
                autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
                manualStarts
        )
        .subscribe(syncStarts) // merge emissions of both sources into one

val syncDisposable = syncStarts
        .concatMap {
            longOperation()
        }
        .subscribe(autoStarts) // end of long operation trigger start of auto timer

启动继电器会产生很多排放。假设用户单击手动启动按钮,距离计时器自动启动还有 5 秒。longOperation()如果简单的话,这两个事件都会导致开始flatMap。我只希望一个线程在longOperation()里面运行,所以如果它现在正在运行并且没有完成 - 忽略开始排放,无论如何完成都会导致计时器重新启动。

ConcatMap帮助我一半 - 它添加longOperation()到“队列”,因此它们被一个一个地处理,但我怎么能写这个来忽略任何进一步的开始,直到第一个完全完成?

标签: javaandroidkotlinrx-javareactivex

解决方案


您可以使用flatMap()额外的整数参数来限制并行度。

syncStarts
  .onBackpressureDrop()               // 1
  .flatMap(() -> longOperation(), 1)  // 2
  ...
  1. flatMap()丢弃忙碌时发生的任何排放。
  2. 数字 1 是订阅的数量flatMap(),本质上是强制操作是连续的。

以上执行您想要的功能。但是,您没有指定您希望在longOperation()运行后发生什么:您是否希望在之后立即开始另一个操作?如果是这样,您需要更改背压处理以最多排队一次排放。


推荐阅读