首页 > 解决方案 > 我从 Monix firstOptionL 得到不一致的结果 - 竞争条件?

问题描述

我从对已转换为 Observable 的同一(MongoDB)数据库调用的重复调用中得到间歇性缺失值。我已经删除了所有数据库代码,以获得一个只有 Monix 位的最小测试用例,而且我偶尔仍然会丢失值 - 通常每 2,000 次测试一到两个。

根据文档 ConcurrentSubject 的意思是“不需要遵循背压合同”,但无论我是否这样做,我都会遇到类似的失败。

import monix.eval.Task
import monix.reactive.{MulticastStrategy, Observable}
import monix.reactive.subjects.ConcurrentSubject
import org.scalatest.FunSuite

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class Test_JustMonix extends FunSuite {

  implicit val scheduler = monix.execution.Scheduler.global

  def build(): Observable[Boolean] = {
    val subject = ConcurrentSubject(MulticastStrategy.publish[Boolean])
    subject.doAfterSubscribe {
      Task.eval {
        subject.onNext(true)
        subject.onComplete()
      }
    }
  }

  test("just monix") {
    (0 until 20).foreach { loop =>
      println(s"loop $loop")
      val tOpts = (0 until 100).map { _ => build().firstOptionL }
      val tDone = Task.gather(tOpts).map { list =>
        val emptyCount = list.count(_.isEmpty)
        assert(emptyCount === 0)
      }
      Await.result(tDone.runToFuture, Duration.Inf)
    }
    println("Finished")
  }
}

在某些运行中,所有 20x100 循环都正确完成 - firstOptionL isDefined 用于所有 2,000 个结果。但是,超过 50% 的时间 assert(emptyCount === 0) 在值为 1 或有时为 2 时触发,这表明我偶尔会得到一个 None 值,好像 onComplete 发生在 onNext 之前?

这可能发生在 20 个循环中的任何一个中,因此它看起来像是一种竞争条件,或者我误解了所需的输入。我已经尝试了几乎所有主题 - PublishSubject,有和没有 BufferedSubscriber,并且都给出了相似的结果。

我也尝试过将 onComplete 延迟到 Ack via

subject.onNext(true).map(_=> subject.onComplete())

这似乎会稍早失败。

我也试过 MulticastStrategy.replay 没有区别。

我在 Scala 2.12.8 上使用 Monix 3.0.0-RC3。

标签: scalamonix

解决方案


推荐阅读