首页 > 解决方案 > RxJava 2 中的 Accumulate 和 Emit 行为

问题描述

我有一个Status代表某些任务状态的类。我有一个带有以下签名的方法

def update(existing: Status, update: Status): Status

该函数采用旧状态和新更新,并返回Status结合这两者的新对象。

我希望自定义主题的行为如下:

我可以使用一个StatusHolder用于写入行为的类(它进行保存和更新)和单独订阅Flowable.interval(1 second).map(i->holder.current).

在实现了这个之后,我遇到了 的概念Subject,它既是一个Observable又是Observer。我想要的功能与此类似。我想要一个可以接收和发出Status对象但需要在接收时进行一些计算的类。

我查看了现有的Subject实现,我认为它们中的任何一个都不自然地支持这种行为。第二件事是,它们在Observable, not上运行Flowable,所以我需要这样做toFlowabletoObservable使用它。

有没有更好的方法来实现这种行为?

标签: rx-javareactive-programmingrx-java2reactive

解决方案


让我们定义source为您的 source Flowable<Status>

Flowable<Status> source = ...

然后你可以试试这个:

Flowable.interval(1, TimeUnit.SECONDS)
        .withLatestFrom(source.scan(this::update), (i, status) -> status)
        .share();
  • .interval()运算符定期发出一个长值。
  • .scan()运算符“累积”源流发出的项目。在你的情况下,累加器是.update功能
  • .withLatestFrom()结合了两个流。这取代了你的.map(i->holder.current)
  • .share()允许所有订阅者共享一个订阅。


一个小的旁注:

有一个Flowable版本Subject,称为Processor.


推荐阅读