首页 > 解决方案 > Scala 和 fs2/cats 的 WebSocket 挑战

问题描述

我正在使用 Http4s 安装一个 websocket 服务,我可以使用该服务在此后端服务和 UI 之间进行通信(批处理作业的管道状态更新和完成百分比)。

我正在使用BlazeBuilder Websocket 示例来设置服务。

该服务有效,但我想做的是从类实例中发出套接字消息。例如,我想实例化一个工作线程,传递套接字连接的引用,并能够将数据发送到该连接。不幸的是,我很难完成这项工作!它在 Python 和 JS 中要简单得多。

请参阅下面的代码,主要是我上面链接的示例代码。在我调用 Stream.emit(...) 的地方,我怎样才能传递对该“toClient”的引用并仍然向它发出?如果我将 toClient 实例传递给类实例,它似乎不起作用。

case GET -> Root / "ws" =>
      val toClient: Stream[F, WebSocketFrame] = Stream.emit(Text("How can I do this from a class instance?"))
      val fromClient: Sink[F, WebSocketFrame] = _.evalMap { (ws: WebSocketFrame) =>
        ws match {
          case Text(t, _) => F.delay(println(t))
          case f => F.delay(println(s"Unknown type: $f"))
        }
      }
      WebSocketBuilder[F].build(toClient, fromClient)

标签: scalawebsockethttp4s

解决方案


您可以使用MVar与 websocket 进行线程安全通信。

这是一个使用 Cats IO Effect 的示例:

final class WebSocketServer(implicit timer: Timer[IO]) extends Http4sDsl[IO] {

  implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  def start: IO[ExitCode] = {
    BlazeServerBuilder[IO]
      .bindHttp(8080)
      .withWebSockets(true)
      .withHttpApp(routes.orNotFound)
      .resource
      .use(_ => IO.never)
      .as(ExitCode.Success)
  }

  private[this] val routes: HttpRoutes[IO] = HttpRoutes.of[IO] {
    case GET -> Root / "ws" => {
      for {
        channel <- cats.effect.concurrent.MVar[IO].empty[List[WebSocketFrame]]
        webSocket <- {
          WebSocketBuilder[IO].build(
            send = fs2.Stream
              .eval(channel.take)
              .flatMap(fs2.Stream.emits(_))
              .repeat,
            receive = stream => {
              stream.evalMap {
                case Text(data, _)   => channel.put(List(Text("pong")))
                case unknown         => IO(println(s"Unknown type: $unknown"))
              }
            }
          )
        }
      } yield webSocket
    }
  }
}

如果要将消息发送回客户端,则必须将其放入 MVar。

channel.put(List(Text("pong")))

有趣的部分是轮询 MVar 以获取新消息以发送回 WebSocket 客户端的重复流。

fs2.Stream.eval(channel.take).flatMap(fs2.Stream.emits(_).repeat

推荐阅读