akka-stream - akka 流如何处理它自己产生源的多个源
问题描述
嗨,我有一个来源,它自己产生来源。预先不知道将产生的源数量。是否有适当的设计模式来处理这种情况。基本上它看起来像 Source ----->Multiple Sources ------->Sink
编辑
方案如下。
- 从数据库迭代器中创建 Source
- 对于上述源提供的每个数据库文件,将文件转换为源
- 将这些动态创建的源附加到文件 IO 接收器
基本上我希望通过带有背压的流将一堆数据库内容写入单独的文件
解决方案
给定一个Source
来源:
type Data = ???
val sos : Source[Source[Data, _], _] = ???
每个Data
源都可以使用Source.runForeach
.
我们首先需要一个可以生成Path
您希望写入数据的函数:
val pathCreator : () => Path = ???
以及一种转换Data
为ByteString
:
val dataToByteString : Data => ByteString = ???
这些功能最终可以组合起来以获得您正在寻找的行为:
val drainSourceToFile : Source[Data, _] => Future[IOResult] =
_.map(dataToByteString)
.to(FileIO.toPath(pathCreator()))
.run()
sos runForeach drainSourceToFile
如果您想要所有的IOResult
值,FileIO.toPath
以便您可以知道写入是否成功,那么您将需要一个稍微复杂的设置:
val allIOResults : Future[Seq[IOResult]] =
sos.map(drainSourceToFile)
.to(Sink.seq)
.run()
.flatMap(Future.sequence)
推荐阅读
- sql - 显示所有日期范围的数据,包括缺失的日期
- highcharts - 如何在highcharts的工具提示中转换日期格式的时间戳?
- nginx - PrestaShop - 无法访问 nginx 上的管理面板
- ios - Xamarin.Forms 如何在 Android 和 iOS 上添加应用评分?
- javascript - 处理多个输入
- c# - CefSharp:ChromiumWebBrowser 中是否有左键单击处理程序?
- sql - 如果sql中的一个字段相同,如何过滤上一条记录
- chef-infra - 资源“模板[/etc/awslogs//cwlogs.cfg]”上的“创建”错误
- python - 有没有一种更有效的方法来获取与python中矩阵内的特定元素具有一个欧几里德距离的元素的所有索引?
- mongodb - 当foreignField在数组中时如何在聚合中执行查找?