首页 > 解决方案 > RxJava2 合并和多播事件流

问题描述

我有一个事件源,它发出一个Snapshot后跟Deltas假设它看起来有点像

Flowable.just("A", "1", "2", "3")

其中“A”是快照,“1”、“2”、“3”是更新。

我希望第一个订阅者检索

"A", "1", "2", "3"

第二个订阅者(假设它发生在“2”和“3”之间)接收

"A".apply("1").apply("2"), "3"

所以我正在寻找的是一个多播流的操作符,但是在所有后续订阅者的下一个发射之前从第一个发射到下一个发射之前发出合并的值。

有人可以指出我正确的方向吗?开始编写自定义运算符,但我觉得我缺少一些简单的东西。

标签: rx-java2

解决方案


和我的同事一起做了更多的工作,我们提出了以下建议

Flowable.just("A", "1", "2", "3")
 .scan(Tuple(empty, empty), (prev, update) -> build "snapshot" "last delta" tuple
 .skip(1)
 .replay(1) // replay tuple of "current snapshot" "latest delta"
 .refCount() 
 .scan(empty, (prev, update) -> empty == prev ? update.snapshot : update.delta)
 .skip(1)

工作得很好,希望对下一个尝试同样事情的人有用


推荐阅读