首页 > 解决方案 > akka 流如何处理它自己产生源的多个源

问题描述

嗨,我有一个来源,它自己产生来源。预先不知道将产生的源数量。是否有适当的设计模式来处理这种情况。基本上它看起来像 Source ----->Multiple Sources ------->Sink

编辑

方案如下。

  1. 从数据库迭代器中创建 Source
  2. 对于上述源提供的每个数据库文件,将文件转换为源
  3. 将这些动态创建的源附加到文件 IO 接收器

基本上我希望通过带有背压的流将一堆数据库内容写入单独的文件

标签: akka-stream

解决方案


给定一个Source来源:

type Data = ???

val sos : Source[Source[Data, _], _] = ???

每个Data源都可以使用Source.runForeach.

我们首先需要一个可以生成Path您希望写入数据的函数:

val pathCreator : () => Path = ???

以及一种转换DataByteString

val dataToByteString : Data => ByteString = ???

这些功能最终可以组合起来以获得您正在寻找的行为:

val drainSourceToFile : Source[Data, _] => Future[IOResult] = 
  _.map(dataToByteString)
   .to(FileIO.toPath(pathCreator()))
   .run()

sos runForeach drainSourceToFile

如果您想要所有的IOResult值,FileIO.toPath以便您可以知道写入是否成功,那么您将需要一个稍微复杂的设置:

val allIOResults : Future[Seq[IOResult]] = 
  sos.map(drainSourceToFile)
     .to(Sink.seq)
     .run()
     .flatMap(Future.sequence)

推荐阅读