首页 > 解决方案 > 我的 Akka-http 流的每个部分都必须生成 Message 类型的输出吗?

问题描述

我正在尝试通过 WebSockets 使用流,并且我想解析接收到的消息并在它们到达接收器之前对其进行转换。但是,每当我使用不Message作为输入的 Source 或 Sink 时,我都会不断收到错误消息。

根据文档

因此,WebSocket 连接被建模为将 Flow[Message, Message, Mat] 连接到的东西或将 Source[Message, Mat] 和 Sink[Message, Mat] 连接到的 Flow[Message, Message, Mat] .

我仍然不确定我是否正确理解了这一点。我的困惑是:使用 Akka-http websockets 的源、流和接收器是否必须始终传递类型Message?有办法解决吗?最重要的是,这里的最佳实践是什么?

我制定了我的代码的简化片段(不是可运行的),而是应该有助于概念化我的问题。

val outgoing = Source.maybe[Message]

val decoder = Flow[Message] map {x => TextMessage("Hello from decode")}

// Do I need to pass a Message here?
val wrongDecoder = Flow[String] map {x => "Help :( I can't Sink! Maybe because I'm String?"}

val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {case message: TextMessage.Strict => message.text}

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .viaMat(wrongDecoder)(Keep.left) // IDE compiler tells me it expected a Graph but found a Flow?
    .toMat(sink)(Keep.both)
    .run()

标签: scalaakkaakka-streamakka-http

解决方案


是的,流必须采取和发出Messages。

val source: Source[Message, SoMat] = ???

val flow: Flow[Message, Message, FMat] = ???

val sink: Sink[Message, SiMat] = ???

source.via(flow).runWith(sink)  // Ignoring which materializations you'd actually want to keep

这似乎是限制性的,但请注意一些签名:

// in Source (with types at least partially expanded)
def map[T](f: Message => T): Source[T, SoMat]

// in Flow (with types at least partially expanded)
def map[T](f: Message => T): Flow[Message, T, FMat]

// in Sink (with types at least partially expanded)
def contramap[T](f: T => Message): Sink[T, SiMap]

即,如果您具有从该类型的功能,您可以将 aSource[Message]映射到您选择的任何类型。同样,您可以采用 a并将其转换为任何类型的 a。对于 a ,您可以将其映射为任何类型并将其映射回.SourceMessageSink[Message]SinkFlowMessage

例如,您可以

val outgoing = Source.maybe[Message]
val decoder: Flow[Message, String, NotUsed] = Flow[Message].mapConcat { m =>
  m match {
    case TextMessage.Strict(msg) => List(msg)
    case _ => Nil
  }
}

val stringProcessor: Flow[String, String, NotUsed] = Flow[String].map { s => s.replace(':', ';') }

val sink: Sink[String, Future[Done]] = Sink.foreach[Message] {
  case TextMessage.Strict(text) => println(text)
}.contramap[String] { s => TextMessage.Strict(s) }

val websocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))

val (upgradeResponse, closed) =
  outgoing
    .viaMat(websocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .viaMat(stringProcessor)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()

我可能会将websocketFlowand组合decoder成一个流 from Messageto String(或任何适用于业务逻辑的域类型(stringProcessor在这种情况下)),使用Sink如上所述的反映射。

val stringsFromWebsocket: Flow[Message, String, Future[WebsocketUpgradeResponse]] =
  Http()
    .webSocketClientFlow(WebSocketRequest(uri))
    .viaMat(decoder)(Keep.left)

val (upgradeResponse, closed) =
  outgoing
    .viaMat(stringsFromWebsocket)(Keep.right)
    .viaMat(stringProcessor)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()

推荐阅读