scala - Scala 和 fs2/cats 的 WebSocket 挑战
问题描述
我正在使用 Http4s 安装一个 websocket 服务,我可以使用该服务在此后端服务和 UI 之间进行通信(批处理作业的管道状态更新和完成百分比)。
我正在使用BlazeBuilder Websocket 示例来设置服务。
该服务有效,但我想做的是从类实例中发出套接字消息。例如,我想实例化一个工作线程,传递套接字连接的引用,并能够将数据发送到该连接。不幸的是,我很难完成这项工作!它在 Python 和 JS 中要简单得多。
请参阅下面的代码,主要是我上面链接的示例代码。在我调用 Stream.emit(...) 的地方,我怎样才能传递对该“toClient”的引用并仍然向它发出?如果我将 toClient 实例传递给类实例,它似乎不起作用。
case GET -> Root / "ws" =>
val toClient: Stream[F, WebSocketFrame] = Stream.emit(Text("How can I do this from a class instance?"))
val fromClient: Sink[F, WebSocketFrame] = _.evalMap { (ws: WebSocketFrame) =>
ws match {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
}
WebSocketBuilder[F].build(toClient, fromClient)
解决方案
您可以使用MVar与 websocket 进行线程安全通信。
这是一个使用 Cats IO Effect 的示例:
final class WebSocketServer(implicit timer: Timer[IO]) extends Http4sDsl[IO] {
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
def start: IO[ExitCode] = {
BlazeServerBuilder[IO]
.bindHttp(8080)
.withWebSockets(true)
.withHttpApp(routes.orNotFound)
.resource
.use(_ => IO.never)
.as(ExitCode.Success)
}
private[this] val routes: HttpRoutes[IO] = HttpRoutes.of[IO] {
case GET -> Root / "ws" => {
for {
channel <- cats.effect.concurrent.MVar[IO].empty[List[WebSocketFrame]]
webSocket <- {
WebSocketBuilder[IO].build(
send = fs2.Stream
.eval(channel.take)
.flatMap(fs2.Stream.emits(_))
.repeat,
receive = stream => {
stream.evalMap {
case Text(data, _) => channel.put(List(Text("pong")))
case unknown => IO(println(s"Unknown type: $unknown"))
}
}
)
}
} yield webSocket
}
}
}
如果要将消息发送回客户端,则必须将其放入 MVar。
channel.put(List(Text("pong")))
有趣的部分是轮询 MVar 以获取新消息以发送回 WebSocket 客户端的重复流。
fs2.Stream.eval(channel.take).flatMap(fs2.Stream.emits(_).repeat
推荐阅读
- android - 在我的应用程序中,我想选择多个图像,但只能是第一次
- sql - 如何从字符串中删除特殊字符和数字但忽略空格
- twig - 如何翻译标题内的文本字符串包括使用 trans
- mysql - Laravel - 使用 Laravel 查询老化报告
- wpf - 如何从 wpf 中动态创建的文本框中检索值?
- python - NoReverseMatch - 在 Django 中重置密码
- flutter - 如何在颤动中将小部件覆盖在背景小部件上?
- java - 此异常“HTTP 500 Erreur Interne de Servlet”的可能原因是什么?
- datetime - 在 Elixir 中构建具有排除项的一系列日期时间
- database - DolphinDB如何从分布式表中清除历史数据?