scala - 我从 Monix firstOptionL 得到不一致的结果 - 竞争条件?
问题描述
我从对已转换为 Observable 的同一(MongoDB)数据库调用的重复调用中得到间歇性缺失值。我已经删除了所有数据库代码,以获得一个只有 Monix 位的最小测试用例,而且我偶尔仍然会丢失值 - 通常每 2,000 次测试一到两个。
根据文档 ConcurrentSubject 的意思是“不需要遵循背压合同”,但无论我是否这样做,我都会遇到类似的失败。
import monix.eval.Task
import monix.reactive.{MulticastStrategy, Observable}
import monix.reactive.subjects.ConcurrentSubject
import org.scalatest.FunSuite
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class Test_JustMonix extends FunSuite {
implicit val scheduler = monix.execution.Scheduler.global
def build(): Observable[Boolean] = {
val subject = ConcurrentSubject(MulticastStrategy.publish[Boolean])
subject.doAfterSubscribe {
Task.eval {
subject.onNext(true)
subject.onComplete()
}
}
}
test("just monix") {
(0 until 20).foreach { loop =>
println(s"loop $loop")
val tOpts = (0 until 100).map { _ => build().firstOptionL }
val tDone = Task.gather(tOpts).map { list =>
val emptyCount = list.count(_.isEmpty)
assert(emptyCount === 0)
}
Await.result(tDone.runToFuture, Duration.Inf)
}
println("Finished")
}
}
在某些运行中,所有 20x100 循环都正确完成 - firstOptionL isDefined 用于所有 2,000 个结果。但是,超过 50% 的时间 assert(emptyCount === 0) 在值为 1 或有时为 2 时触发,这表明我偶尔会得到一个 None 值,好像 onComplete 发生在 onNext 之前?
这可能发生在 20 个循环中的任何一个中,因此它看起来像是一种竞争条件,或者我误解了所需的输入。我已经尝试了几乎所有主题 - PublishSubject,有和没有 BufferedSubscriber,并且都给出了相似的结果。
我也尝试过将 onComplete 延迟到 Ack via
subject.onNext(true).map(_=> subject.onComplete())
这似乎会稍早失败。
我也试过 MulticastStrategy.replay 没有区别。
我在 Scala 2.12.8 上使用 Monix 3.0.0-RC3。
解决方案
推荐阅读
- apache - 通过 htaccess 文件重写 URL 时遇到问题
- ionic-framework - 为什么在输入命令 Ionic Serve 时出现错误
- ruby - 无法渲染仅在 Heroku 上导入 google-maps-react 的反应组件
- django - Django-REST:数据库驱动程序不支持现代数据时间类型
- c# - 有没有办法在渲染之前从父组件枚举子内容?
- mongodb - 在 Scrapy 插入之前检查 MongoDB 中是否存在记录
- google-apps-script - Apps 脚本拒绝连接
- python - 将数据框列值转换为列表
- cordova - 如何修复“运行子进程cordova时发生错误”
- blockchain - 我将如何列出地址中出现子字符串“XYZ”的所有 BTC 地址?