scala - 使用 Akka Stream,如何动态复制流?
问题描述
我正在运行一个实时视频流服务器。有Array[Byte]
视频源。请注意,我无法与我的视频源建立 2 个连接。我希望每个连接到我的服务器的客户端都接收相同的流,并带有一个丢弃旧帧的缓冲区。
我尝试使用BroadcastHub
这样的:
val source =
Source.fromIterator(() => myVideoStreamingSource.zipWithIndex)
val runnableGraph =
source.toMat(BroadcastHub.sink(bufferSize = 2))(Keep.right)
runnableGraph.run().to(Sink.foreach { index =>
println(s"client A reading frame #$index")
}).run()
runnableGraph.run().to(Sink.foreach { index =>
println(s"client B reading frame #$index")
}).run()
我得到:
client A reading frame #0
client B reading frame #1
client A reading frame #2
client B reading frame #3
我们看到主流在两个客户端之间进行了分区,而我希望我的两个客户端能够看到所有源流的帧。
我错过了什么,还是有其他解决方案?
解决方案
问题是Iterator
with的组合BroadcastHub
。我假设你myVideoStreamingSource
是这样的:
val myVideoStreamingSource = Iterator("A","B","C","D","E")
我现在引用BroadcastHub.Sink
:
[[Sink]] 的每一个新物化都会产生一个新的独立集线器,该集线器会物化到它自己的 [[Source]] 以消耗该物化的 [[Sink]]。
这里的问题是它还没有使用迭代器中的数据。
迭代器的问题是,一旦你消费了它的数据,你就不会再回到开始了。再加上两个图并行运行的事实,看起来它“划分”了两者之间的元素。但实际上这完全是随机的。例如,如果您在Client A
和之间添加 1 秒的睡眠Client B
,那么将打印的唯一客户端将是A
.
为了完成这项工作,您需要创建一个可逆的源。例如Seq
, 或List
。以下将做:
val myVideoStreamingSource = Seq("A","B","C","D","E")
val source = Source.fromIterator(() => myVideoStreamingSource.zipWithIndex.iterator)
推荐阅读
- sql - SQL 计算 max(x) 和 min(x)+y 之间的 x
- excel - 运行代码后,范围值发生变化并出现错误 REF
- c# - 即使应用程序关闭 SQL C#,如何保持用户登录
- javascript - React Context API 功能组件
- git - 尽管没有进行任何更改,但 Git 本地主机仍处于领先地位
- javascript - 如何在反应中使用 Jest 进行异步按钮单击
- python - 尝试使用 XPath 查询从属性中提取数据时获取空列表
- python - Pandas DataFrame 在特定日期时间之前出现的列中数据的平均值
- python-3.x - 使整个 tkinter 窗口变暗
- jquery - 在数据表中显示下拉列表