java - Project Reactor - 有状态地组合两个发布者并发出结果
问题描述
我想用 Reactor 设计一个处理管道,如下所示。
我有两个输入发布者orderEntries
(冷)和hotBroadcasts
(热)。我想将发出的项目聚合hotBroadcasts
到(可变)内存数据结构中 - 比如说一个HashMap
- 对于每个项目,orderEntries
我想从该 Map 中选择一个相应的元素,创建结果项目并推送到下游订阅者。
来自的事件hotBroadcasts
以任意顺序出现,这就是为什么我想将它们存储在内存中以便于检索。
从概念上讲,它应该像这样工作:
orderEntries hotBroadcasts
| |
| |
| |
\ /
----------------> <----------------
(aggregate events from hotBroadcasts)
|
|
resulting item
|
|
\/
downstream subcriber
到目前为止,我设法用ReplayProcessor
Kotlin 伪示例说明了一个带有 的解决方案:
val orderEntries = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
val hotBroadcasts = ReplayProcessor.create<String>(1000, false)
orderEntries.concatMap { entryId ->
// problematic filter - skims through all that ReplayProcessor has cached
hotBroadcasts.filter { broadcastId ->
"Broadcast:$entryId" == broadcastId
}
.take(1)
.map { "EntryId: $entryId, BroadcastId: $it" }
}.subscribe { LOG.info(it) }
Flux.interval(Duration.of(200, ChronoUnit.MILLIS))
.concatMap { Flux.just(it, it - 100000) }
.map { "Broadcast:$it" }
.subscribe {
hotBroadcasts.onNext(it)
}
这里的问题是hotBroadcast
从orderEntries
. 因此我的想法是将它们存储在 HashMap 中。
谁能指出我正确的方向?
解决方案
可以聚合来自两个不同发布者的消息的对象是具有 2 个参数的异步过程调用。这样的调用可以在 rxjava 中使用 构建,也可以io.reactivex.Single.zip(SingleSource arg1, SingleSource arg2, BiFunction func)
在纯 Java 中使用java.util.concurrent.CompletableFuture.thenCombine(CompletionStage arg2, BiFunction func)
.
您需要一个特殊的 HashMap 来保存异步过程调用。当第一次使用给定标签访问此 HashMap 时,应自动创建调用。
所以一位 Publicher 调用
asyncProc=callMap.get(label); // asyncProc is created and stored with the label as a key
asyncProc.arg1.complete(value);
和其他 Publicher 调用
asyncProc=callMap.get(label); // previously created instance returned
asyncProc.arg2.complete(value);
在两个发布者都提供了他们的参数之后,异步过程被执行。
推荐阅读
- javascript - @composi/gestures 滑动点亮元素
- gsm - Internet 服务命令(AT 命令)出现未知错误
- excel - 如何在企业 SAS 中添加多个工作表以 excel?
- javascript - 反应原生 | useState 异步等待
- sql - 两个表的本机外连接 - 如何重写为 ANSI 连接?
- html - 如何修复 div 中的 div 高度和图像大小?
- amazon-athena - 我不能在 athena 上取消嵌套子数组
- c++ - 内存分配如何工作以及何时发生?
- kubernetes - Mac 上 Docker for Desktop 中 hostPath 卷的基本目录是什么?
- python - 运行文件夹中的所有 std 输入测试文件并验证结果?(Python)