首页 > 解决方案 > 合并后如何保留两个可观察对象发出的项目的顺序?

问题描述

我遇到了令我惊讶的 Scala Observables 行为。考虑下面我的例子:

object ObservablesDemo extends App {

  val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
  val oSlow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
  val oBoth = (oFast merge oSlow).take(8)

  oBoth.subscribe(println(_))
  oBoth.toBlocking.toIterable.last

}

该代码演示了从两个可观察对象发射元素。其中一个以“慢”的方式(每 7 秒)发射其元素,另一个以“快速”的方式(每 3 秒)发射。为了这个问题,假设我们想使用map函数定义这些可观察对象,并从上面看到的适当映射数字interval(而不是另一种可能的方法,即从两个可观察对象以相同的速率发射项目,然后filter根据需要退出)。

代码的输出对我来说似乎违反直觉:

[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9   <-- HERE
[SLOW] 7   <-- HERE
[FAST] 12
[FAST] 15

有问题的部分是当 observable 在[FAST]observable 发出9之前[SLOW]发出7。我希望在第 7 秒发出的内容之前发出,因为在第 9 秒发出的内容之前79

我应该如何修改代码以实现预期的行为?我查看了 RxScala 文档并开始搜索不同的interval函数和Scheduler类等主题,但我不确定它是否是搜索答案的正确位置。

标签: scalaobservablerx-javareactivexrx-scala

解决方案


这看起来像它应该工作的方式。这里列出了秒数和事件。您可以验证RXScalaTestObserverTestScheduler是否可用。RXScala 在 2019 年停产,所以请记住这一点。

Secs   Event
-----------------
1
2
3      [Fast] 0
4
5
6      [Fast] 3
7      [Slow] 0
8
9      [Fast] 6
10
11
12     [Fast] 9
13
14     [Slow] 7
15     [Fast] 12
16
17
18     [Fast] 15
19
20
21     [Fast] 18

推荐阅读