首页 > 解决方案 > FS2 队列生产者和消费者

问题描述

我曾经fs2.concurrent.Queue通过队列尝试生产者/消费者。

val program = fs2.Stream.eval(Queue.bounded[IO, Option[Int]](maxSlots)).flatMap {
      q =>
        val p = customers[IO](10.millis, 100.millis, openingSeconds).evalMap(writeToQueue[IO](q))
        val c = q.dequeue.unNoneTerminate.parEvalMap(maxBarbers)(barbers[IO](300.millis, 100.millis))
        c concurrently p
    }

我现在使用的解决方案是p停止生产Some(id),其中 id 是客户的 id,openingSeconds然后更改为 Stream ofNone以终止cie q.dequeue.unNoneTerminate

我可以使用单个None而不是重复的流None来终止c吗?

我遇到的问题None是当队列q已满时,单None将不会插入队列,因为writeToQueue当队列已满时不会接受新元素。pc以不同的速度生产和消费。谢谢

标签: scalafs2

解决方案


推荐阅读