首页 > 解决方案 > Akka Streams - 创建一个在接收到输入时从源发出的流

问题描述

我有一个Source提供类型 A 的元素。

我有一个Flow接收类型 B 的元素。

我想做的是,当流接收到输入时,源中的下一个元素作为流的输出发出。

我目前这样做的方式是将源连接到Sink.queue. 然后对于流中的每个元素,我映射它,丢弃输入,并从队列中提取下一个值。一旦队列为空,我就完成了流程。

我觉得应该有一种我想念的更简单的方法。可能有一些内置机制允许输入从源触发元素。

例如:

val source = ... some akka streams source

val queue = source.grouped(limit.toInt).runWith(Sink.queue[Seq[DataFrame]])

Flow[Message]
  .prepend(Source.single(TextMessage.Strict("start")))
  .collect {
    case TextMessage.Strict(text) => Future.successful(text)
    case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _).flatMap(Future.successful)
  }
  // swallow the future of the incoming message's text
  .mapAsync(1)(identity)
  // take the next batch
  .mapAsync(1)(_ => queue.pull())
  // swallow the option monad, and add in an end or page-end message
  .collect {
  case Some(batch) if batch.size == limit => batch.toList :+ pageend
  case Some(batch) => batch.toList :+ end
  case None => List(end)
}
  // flatten out the frames
  .mapConcat(identity)

end并且pageend只是 ui 使用的特殊框架。问题的关键部分是围绕队列的这种使用。

标签: akka-streamakka-http

解决方案


推荐阅读