akka-stream - Akka Streams - 创建一个在接收到输入时从源发出的流
问题描述
我有一个Source
提供类型 A 的元素。
我有一个Flow
接收类型 B 的元素。
我想做的是,当流接收到输入时,源中的下一个元素作为流的输出发出。
我目前这样做的方式是将源连接到Sink.queue
. 然后对于流中的每个元素,我映射它,丢弃输入,并从队列中提取下一个值。一旦队列为空,我就完成了流程。
我觉得应该有一种我想念的更简单的方法。可能有一些内置机制允许输入从源触发元素。
例如:
val source = ... some akka streams source
val queue = source.grouped(limit.toInt).runWith(Sink.queue[Seq[DataFrame]])
Flow[Message]
.prepend(Source.single(TextMessage.Strict("start")))
.collect {
case TextMessage.Strict(text) => Future.successful(text)
case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _).flatMap(Future.successful)
}
// swallow the future of the incoming message's text
.mapAsync(1)(identity)
// take the next batch
.mapAsync(1)(_ => queue.pull())
// swallow the option monad, and add in an end or page-end message
.collect {
case Some(batch) if batch.size == limit => batch.toList :+ pageend
case Some(batch) => batch.toList :+ end
case None => List(end)
}
// flatten out the frames
.mapConcat(identity)
end
并且pageend
只是 ui 使用的特殊框架。问题的关键部分是围绕队列的这种使用。
解决方案
推荐阅读
- android - 如何将 CreateDataBinding 导入 Xamarin
- apache-spark - 如何使用 Spark 数据集和 UDF 分析类型不匹配错误
- c++ - C++ 字符串到 wstring 打印不正确,无法获取 unicode 路径
- vb6 - 在 vb6 中卸载动态创建的标签时出错
- mainframe - 在 IBM z 中是否有像 IBM i 中的显示文件 (dspf)?
- java - 我用Gradle插件编译Java代码的时候,本地JDK是1.8的,但是我想编译成1.7版本
- java - 请求在 chrome 中待处理
- javascript - Confluence “Hello World” 宏帮助 - NPM 安装/启动不工作
- r - 向量化嵌套 ifelse
- json - chatfuel Json api插件上的消息数组问题:仅发送文本对象但不发送模板