rx-java - RxJava 2 中的 Accumulate 和 Emit 行为
问题描述
我有一个Status
代表某些任务状态的类。我有一个带有以下签名的方法
def update(existing: Status, update: Status): Status
该函数采用旧状态和新更新,并返回Status
结合这两者的新对象。
我希望自定义主题的行为如下:
写行为
- 它应该包含一个
Status
. 让我们称之为current
- 我应该能够订阅给定的
Flowable<Status>
- 在每个新的状态更新(通过上面的
Flowable
),它会调用update
上面提到的方法并将结果存储在current
.
- 它应该包含一个
阅读行为
- 定期读取,应该能够定期发出当前状态。就像是
Flowable.interval(1 second).map(i->current)
- 定期读取,应该能够定期发出当前状态。就像是
我可以使用一个StatusHolder
用于写入行为的类(它进行保存和更新)和单独订阅Flowable.interval(1 second).map(i->holder.current)
.
在实现了这个之后,我遇到了 的概念Subject
,它既是一个Observable
又是Observer
。我想要的功能与此类似。我想要一个可以接收和发出Status
对象但需要在接收时进行一些计算的类。
我查看了现有的Subject
实现,我认为它们中的任何一个都不自然地支持这种行为。第二件事是,它们在Observable
, not上运行Flowable
,所以我需要这样做toFlowable
并toObservable
使用它。
有没有更好的方法来实现这种行为?
解决方案
让我们定义source
为您的 source Flowable<Status>
。
Flowable<Status> source = ...
然后你可以试试这个:
Flowable.interval(1, TimeUnit.SECONDS)
.withLatestFrom(source.scan(this::update), (i, status) -> status)
.share();
.interval()
运算符定期发出一个长值。.scan()
运算符“累积”源流发出的项目。在你的情况下,累加器是.update
功能.withLatestFrom()
结合了两个流。这取代了你的.map(i->holder.current)
.share()
允许所有订阅者共享一个订阅。
一个小的旁注:
有一个Flowable
版本Subject
,称为Processor
.
推荐阅读
- php - Yii2 swiftmailer 不通过控制台发送电子邮件
- javascript - 为什么我得到一个 TypeError 而不是一个函数。在数组对象上调用 forEach
- python - django Select_related():从另一个模型有一个 ForginKey 到将执行 Select_related 的模型
- azure-data-factory - 如何在 Synapse 复制活动中从 $$FILEPATH 中提取文件名?
- javascript - 如何在 Charts.js 3.x 中将“hh:mm:ss”字符串解析为时间
- php - 如何检查护照个人令牌是否已过期或被撤销?
- typescript - TypeScript - What does importing and exporting interfaces as type do?
- java - Android - FileNotfoundexception permission denied for copied files
- parallel-processing - 如何在非常大的图上运行算法?
- php - 如何在 Laravel 中使用两个表进行自定义排序