scala - Akka 流:为什么 Sink.head 用alsoTo 广播终止流?
问题描述
我们来看一个非常简单的案例:
Source(1 to 10)
.alsoTo(Sink.foreach(v => println(s"each: $v")))
.toMat(Sink.head)(Keep.right)
.run()
根据alsoTo
文档,我希望Sink.foreach
打印所有元素,但是,它只先打印。如果我切换Sink.foreach
和Sink.head
放置,也会发生同样的情况。
但是,如果广播是通过实现GraphDSL
的,则即使其中一个接收器是,也会消耗整个源Sink.head
。
编辑:文档alsoTo
说明如下:
将给定的 Sink 附加到此 Flow,这意味着通过此 Flow 的元素也将被发送到 Sink。
对我来说,这听起来像是广播,但也许这就是我犯错误的地方。我也可以解释为toMat
控制流程。因此,我希望以下内容从源中获取所有元素:
Source(1 to 10)
.alsoTo(Sink.head)
.toMat(Sink.seq)(Keep.right)
.run()
GraphDSL 版本按我的预期工作:
val s1 = Sink.foreach[Int](v => println(s"each: $v"))
val s2 = Sink.head[Int]
val source = Source(1 to 10)
RunnableGraph.fromGraph(GraphDSL.create(s1, s2)((_, _)) { implicit builder => (s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}).run()
解决方案
原因是Sink.head
消耗单个元素并完成自身。这以 a 的形式向上游传播,cancel
此后不会从 Source 发送任何元素。
来自的代码akka.stream.impl.HeadOptionStage.onPush
显示它
def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}
在哪里completeStage
在所有已调用的输入或输出端口上自动调用 [[cancel()]] 或 [[complete()]],然后将操作符标记为已停止。
更新
alsoTo
是使用以下参数配置的广播:
val bcast = b.add(Broadcast[Out](2, eagerCancel = true))
您的GraphDSL
版本工作方式不同,因为默认广播是eagerCancel = false
.
在哪里eagerCancel
如果为真,则广播在其任何下游取消时取消上游。
推荐阅读
- python - 生成非常具体的向量
- python - 机器人框架 - 获取变量
- python - win32com.client 发送不同的附件而不是固定的文件路径
- python - 在递归中使用和不使用'return'调用函数有什么区别?
- python - 如何在kivy的gridLayout中更改特定按钮的颜色
- ssl - 是否可以有没有主机名的入口 tls?
- javascript - Bluebeam 或 Adobe SaveAs 通过使用 JS 的提交按钮
- graph - 3D 图形网络拓扑未知搜索命令拟合
- haskell - ghc 无法编译任何东西并给出看似无关的错误消息
- php - 使用PHP增加变量后如何通过if语句重申?