scala - 合并后如何保留两个可观察对象发出的项目的顺序?
问题描述
我遇到了令我惊讶的 Scala Observables 行为。考虑下面我的例子:
object ObservablesDemo extends App {
val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
val oSlow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
val oBoth = (oFast merge oSlow).take(8)
oBoth.subscribe(println(_))
oBoth.toBlocking.toIterable.last
}
该代码演示了从两个可观察对象发射元素。其中一个以“慢”的方式(每 7 秒)发射其元素,另一个以“快速”的方式(每 3 秒)发射。为了这个问题,假设我们想使用map
函数定义这些可观察对象,并从上面看到的适当映射数字interval
(而不是另一种可能的方法,即从两个可观察对象以相同的速率发射项目,然后filter
根据需要退出)。
代码的输出对我来说似乎违反直觉:
[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9 <-- HERE
[SLOW] 7 <-- HERE
[FAST] 12
[FAST] 15
有问题的部分是当 observable 在[FAST]
observable 发出9
之前[SLOW]
发出7
。我希望在第 7 秒发出的内容之前发出,因为在第 9 秒发出的内容之前7
。9
我应该如何修改代码以实现预期的行为?我查看了 RxScala 文档并开始搜索不同的interval
函数和Scheduler
类等主题,但我不确定它是否是搜索答案的正确位置。
解决方案
这看起来像它应该工作的方式。这里列出了秒数和事件。您可以验证RXScalaTestObserver
中TestScheduler
是否可用。RXScala 在 2019 年停产,所以请记住这一点。
Secs Event
-----------------
1
2
3 [Fast] 0
4
5
6 [Fast] 3
7 [Slow] 0
8
9 [Fast] 6
10
11
12 [Fast] 9
13
14 [Slow] 7
15 [Fast] 12
16
17
18 [Fast] 15
19
20
21 [Fast] 18
推荐阅读
- javascript - 在 javascript 中从 MongoDB 数据库中检索数据
- pandas - 为某个列值分组行
- django - 无法在 django 中保存 ManyToMany 字段?
- c++ - 如何写入在全局和 pthread 函数之外定义的向量
- scala - Product.scala trait 中 productPrefix def 的实际使用
- python - 在 Python 输出上使用 Bash“测试”运算符?
- svelte - 如何在 Svelte 中禁用组件挂载和销毁的过渡动画?
- google-apps-script - 关于简单代码的表单提交触发器,但有两个表单链接到 Google 表格电子表格
- angular - 导入的库 - 模拟方法和排除
- leaflet - 迭代 Leaflet 标记簇组并更新标记的标题