首页 > 解决方案 > 在流中发送 actorRefWithAck

问题描述

我正在使用这个线程的答案,因为我需要特别对待第一个元素。问题是,我需要将此数据发送到另一个 Actor 或在本地保存(这是不可能的)。

所以,我的流看起来像这样:

val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message]])] = Flow.fromSinkAndSourceMat(
  Flow[Message].mapAsync[Trade](1) {
    case TextMessage.Strict(text) =>
      Unmarshal(text).to[Trade]
    case streamed: TextMessage.Streamed =>
      streamed.textStream.runFold("")(_ ++ _).flatMap(Unmarshal(_).to[Trade])
  }.groupBy(pairs.size, _.s).prefixAndTail(1).flatMapConcat {
    case (head, tail) =>
      // sending first element here
      val result = Source(head).to(Sink.actorRefWithAck(
        ref = actor,
        onInitMessage = Init,
        ackMessage = Ack,
        onCompleteMessage = "done"
      )).run()
      // some kind of operation on the result
    Source(head).concat(tail)
  }.mergeSubstreams.toMat(sink)(Keep.right),
  Source.maybe[Message])(Keep.both)

这是一个好习惯吗?会不会产生意想不到的后果?不幸的是,我无法调用persist内部流,所以我想将此数据发送到外部系统。

标签: scalaakkaakka-stream

解决方案


您当前的方法不会result以任何方式使用,因此更简单的替代方法是触发并忘记第一个Message演员:

groupBy(pairs.size, _.s).prefixAndTail(1).flatMapConcat {
  case (head, tail) =>
    // sending first element here
    actor ! head.head

    Source(head).concat(tail)
}

然后,参与者不必担心处理Init和发送Ack消息,并且可以只关心持久化Message实例。


推荐阅读