scala - WebSocket 端点中的“Spawn”并发效果
问题描述
我有以下代码:
class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
var queue = Queue.bounded[F, String](100)
def createService(queue: Queue[F, String]): F[Unit] = ???
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
// How to "spawn" createService?
toClientF.flatMap { toClient =>
WebSocketBuilder[F].build(toClient, fromClient)
}
}
}
createService
是一个创建新服务的函数。创建新服务是一个非常复杂的过程,它涉及触发 CI 管道,等待它们完成,然后以相同的方式触发更多 CI 管道。它接收到的队列将用于向浏览器报告当前正在执行的操作。
我想同时“生成” createService 并让它运行直到完成。但是同时我想立即将 WebSocket 返回给客户端。Aka 我不能在“生成”createService 时阻止。
我被困住了。我只能考虑使用shift
,但这意味着 for comprehension 中的下一行将阻止等待createService
完成,然后将 websocket 返回给客户端。
我的方法错了吗?我究竟做错了什么?
解决方案
由于F
是 的一个实例ConcurrentEffect
,因此您也有一个Concurrent
实例。
因此,您可以使用Concurrent[F].start
which 返回 aFiber
到正在运行的操作(如果您不需要取消/确保完成,则可以忽略 Fiber)。
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
for {
toClient <- toClientF
_ <- Concurrent[F].start(createService)
websocket <- WebSocketBuilder[F].build(toClient, fromClient)
} yield websocket
}
推荐阅读
- python - 如何用颜色进行线性插值?
- python - 数据框 pyspark 从前一行更新行
- algorithm - 使 BITWISE AND 积极
- node.js - 如何测试 jest + sequelize?
- python - 命中KeyboardInterrupt子进程时如何将最后一个输出保存在变量中
- node.js - 带有集合 {merge} 的 Firestore 安全规则
- c++ - 使用 C++ 的条件
- regex - 使用正则表达式连接火花中的两个数据帧
- javascript - 将 React 应用程序部署到生产环境返回“您需要启用 JavaScript 才能运行此应用程序”
- powershell - Azure DevOps PowerShell 任务无法加载“Az.CosmosDB”模块