首页 > 解决方案 > 使用 Akka Stream,如何动态复制流?

问题描述

我正在运行一个实时视频流服务器。有Array[Byte]视频源。请注意,我无法与我的视频源建立 2 个连接。我希望每个连接到我的服务器的客户端都接收相同的流,并带有一个丢弃旧帧的缓冲区。

我尝试使用BroadcastHub这样的:

  val source =
    Source.fromIterator(() => myVideoStreamingSource.zipWithIndex)

  val runnableGraph =
    source.toMat(BroadcastHub.sink(bufferSize = 2))(Keep.right)

  runnableGraph.run().to(Sink.foreach { index =>
      println(s"client A reading frame #$index")
  }).run()

  runnableGraph.run().to(Sink.foreach { index =>
      println(s"client B reading frame #$index")
  }).run()

我得到:

client A reading frame #0
client B reading frame #1
client A reading frame #2
client B reading frame #3

我们看到主流在两个客户端之间进行了分区,而我希望我的两个客户端能够看到所有源流的帧。

我错过了什么,还是有其他解决方案?

标签: scalaakka-stream

解决方案


问题是Iteratorwith的组合BroadcastHub。我假设你myVideoStreamingSource是这样的:

val myVideoStreamingSource = Iterator("A","B","C","D","E")

我现在引用BroadcastHub.Sink

[[Sink]] 的每一个新物化都会产生一个新的独立集线器,该集线器会物化到它自己的 [[Source]] 以消耗该物化的 [[Sink]]。

这里的问题是它还没有使用迭代器中的数据。

迭代器的问题是,一旦你消费了它的数据,你就不会再回到开始了。再加上两个图并行运行的事实,看起来它“划分”了两者之间的元素。但实际上这完全是随机的。例如,如果您在Client A和之间添加 1 秒的睡眠Client B,那么将打印的唯一客户端将是A.

为了完成这项工作,您需要创建一个可逆的源。例如Seq, 或List。以下将做:

val myVideoStreamingSource = Seq("A","B","C","D","E")
val source = Source.fromIterator(() => myVideoStreamingSource.zipWithIndex.iterator)

推荐阅读