scala - FS2 队列生产者和消费者
问题描述
我曾经fs2.concurrent.Queue
通过队列尝试生产者/消费者。
val program = fs2.Stream.eval(Queue.bounded[IO, Option[Int]](maxSlots)).flatMap {
q =>
val p = customers[IO](10.millis, 100.millis, openingSeconds).evalMap(writeToQueue[IO](q))
val c = q.dequeue.unNoneTerminate.parEvalMap(maxBarbers)(barbers[IO](300.millis, 100.millis))
c concurrently p
}
我现在使用的解决方案是p
停止生产Some(id)
,其中 id 是客户的 id,openingSeconds
然后更改为 Stream ofNone
以终止c
ie q.dequeue.unNoneTerminate
。
我可以使用单个None
而不是重复的流None
来终止c
吗?
我遇到的问题None
是当队列q
已满时,单None
将不会插入队列,因为writeToQueue
当队列已满时不会接受新元素。p
并c
以不同的速度生产和消费。谢谢
解决方案
推荐阅读
- javascript - 更新功能失败,创建和删除工作
- python - 如何使用步骤对多维 numpy 数组进行子集化?
- azure-active-directory - PowerBI 访问安全的 rest api
- python - (Python:不和谐)错误:无法为使用 PEP 517 且无法直接安装的 multidict、yarl 构建轮子
- microsoft-edge - 没有插件或开发工具的 Microsoft Edge 用户代理覆盖
- heroku - 即使应用程序正在运行,Heroku 部署也会失败
- c++ - 放大的 OpenCV 图像
- powershell - 即使设置为 SilentlyContinue,PowerShell Remove-Item 也会返回错误
- discord.py - 可以回复的 Discord dm 机器人
- python - 使用 Concat 将系列移位连接到数据框