java - Rx 运算符。忽略直到发出下一个
问题描述
在我的应用程序中,我有可以通过多种方式启动的耗时逻辑,比如说自动或由用户手动启动。
// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()
// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }
val startsDisposable = Observable
.merge(
autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
manualStarts
)
.subscribe(syncStarts) // merge emissions of both sources into one
val syncDisposable = syncStarts
.concatMap {
longOperation()
}
.subscribe(autoStarts) // end of long operation trigger start of auto timer
启动继电器会产生很多排放。假设用户单击手动启动按钮,距离计时器自动启动还有 5 秒。longOperation()
如果简单的话,这两个事件都会导致开始flatMap
。我只希望一个线程在longOperation()
里面运行,所以如果它现在正在运行并且没有完成 - 忽略开始排放,无论如何完成都会导致计时器重新启动。
ConcatMap
帮助我一半 - 它添加longOperation()
到“队列”,因此它们被一个一个地处理,但我怎么能写这个来忽略任何进一步的开始,直到第一个完全完成?
解决方案
您可以使用flatMap()
额外的整数参数来限制并行度。
syncStarts
.onBackpressureDrop() // 1
.flatMap(() -> longOperation(), 1) // 2
...
flatMap()
丢弃忙碌时发生的任何排放。- 数字 1 是订阅的数量
flatMap()
,本质上是强制操作是连续的。
以上执行您想要的功能。但是,您没有指定您希望在longOperation()
运行后发生什么:您是否希望在之后立即开始另一个操作?如果是这样,您需要更改背压处理以最多排队一次排放。
推荐阅读
- c# - 在运行时更改增强对象(AR 核心)的颜色
- java - 多个用户可以访问一个 azure clientID 吗?
- powershell - AWX/Ansible 中的 PowerShell foreach 循环
- sql - 选择两个日期之间的假期计数
- c# - ASP.NET Core 3.0 Razor Pages Select Helper Not Posting Value
- azure-devops - 从 Azure Pipelines 更新 JIRA 问题
- python - 为什么我的 python 应用程序的 cron 作业总是返回 301 响应代码?
- elixir - 如何使用 Elixir 从 Mailgun 中邮寄的存储中获取附件
- sql - 我想问一个关于 ORACLE SQL 的问题
- opencv - 压力到图像和图像到压力的转换