scala - 在连接源和接收器之前等待客户端 websocket 流连接
问题描述
我正在使用 akka-streams 来设置客户端 Web 套接字。我正在尝试将设置封装在具有以下签名的方法中:
def createConnectedWebSocket(url: String): Flow[Message, Message, _]
很清楚如何创建 Web 套接字流,但尚未连接:
val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest(url))
我首先要Await
升级响应未来,然后返回套接字流。但是,为了获得未来,我必须实现流程,为此我必须连接 aSource
和 a Sink
。但这应该是其他一些适配器类的责任,例如序列化和反序列化 json 对象并公开Flow[JsValue, JsValue, _]
. 它不应该担心连接丢失时可能会重新连接(一旦我设法编写它,这种行为将成为我方法的更复杂版本的一部分)。它应该只需要处理一个简单的Flow
.
我设法通过使用集线器实现了我想要的部分功能:
val mergeHubSource = MergeHub.source[Message](perProducerBufferSize = 16)
val broadcastHubSink = BroadcastHub.sink[Message](bufferSize = 16)
val ((messageSink, upgradeResponse), messageSource) =
mergeHubSource
.viaMat(webSocketFlow)(Keep.both)
.toMat(broadcastHubSink)(Keep.both)
.run()
所以现在我有一个Source
和一个Sink
我可以组合成一个Flow
并返回它。问题是,我对集线器功能不感兴趣。当我将 a 连接Source
到结果Flow
并关闭它时,这应该传播到套接字,即套接字应该关闭。使用 aMergeHub
时,它保持打开状态以便能够接受新来源。
这可能吗?我认为我可以与自定义演员弥合差距,但感觉就像我在这里重新发明了一些可能已经以另一种形式实现的东西。
解决方案
我找到了使用SourceRef
and的解决方案SinkRef
。尽管它们旨在用于弥合两台机器之间的差距,但它们也可以在这里使用。
val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest(someUrl))
val (sinkRefFuture, sourceRefFuture) =
StreamRefs.sinkRef[In]()
.viaMat(f)(Keep.left)
.toMat(StreamRefs.sourceRef[Out]())(Keep.both)
.run()
val flow = Flow.fromSinkAndSource(await(sinkRefFuture), await(sourceRefFuture))
await()
例如像这样定义:
def await[T, F <: T](f: Future[F]): T = Await.result(f, 3.seconds)
话虽如此,我认为至少在我的情况下,实际上最好不要提前实现套接字。这样,无论谁使用它,都可以重新连接。我现在正在传递一个流工厂,它根据需要创建 Web 套接字的新实例Flow
(可能只有我实现一次)。
推荐阅读
- java - 如何使用Spring根据查询参数键和值进行过滤
- docker - 如何在 Cloud Run 上自动部署 Cloud Build 构建的 docker 镜像
- azure - Azure - 无法从私有 Azure 存储库运行映像
- java - 当密钥使用文件夹结构定义时,我们如何从 AWS S3 存储桶中获取对象?
- c++ - 如何停止这个模板化函数值初始化一个新构造的对象?
- visual-studio-2019 - Visual Studio 2019 的指标边距上的各种括号是什么意思?
- arrays - 将 JSON 结构转换为数组:Groovy
- windows - 启动命令后终端窗口未关闭
- c# - C# 防止其他应用程序中的鼠标悬停效果
- typescript - Gatsby 与 Typescript 导入资产错误 (ts2307)