首页 > 解决方案 > 在 Akka Streams 中使用 Keep.right / Keep.left 不会影响结果

问题描述

我试图想出不同的例子来理解 Keep.left 或 Keep.right 的工作

我有以下代码:

val numSource = Source(1 to 10)
  val numSource = Source(1 to 10)
  val incrementFlow = Flow[Int].map(x => x +1)
  val doubleFlow = Flow[Int].map(x => x * 10)

  val someFuture: NotUsed =           numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
  val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
  val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()

我假设在someOtherFuture viaMat(incrementFlow)(Keep.left)中将忽略元素的增量,因为我使用的是源的物化值(而不是结果值 = Keep.right),并且图形的结果将等于 num * 10。但所有3 行给了我相同的结果: //20,30,40 .. 110

我在这里缺少什么?我已经检查了文档并尝试实现简单的示例,但看起来我对实现的想法有误。或者它可能是因为我正在使用一个没有任何接收两个输入的合并流的序列图?

标签: scalaakka-stream

解决方案


物化值最好理解为流之外的代码已经开始(无论是仍在运行还是已完成/失败)以某种方式与流交互的方法:它是运行流的返回值,我们可以看到Keep.right/Keep.left在您发布的代码中的效果:

val someFuture: NotUsed =           numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()

numSourceincrementFlowdoubleFlowis的具体化值NotUsed基本上意味着这些阶段不会为流之外的代码公开任何与流交互的方式。

对于Sink.foreach,具体化值是 a ,如果流运行直到源耗尽并且所有其他阶段都成功,则该值Future[Done]将成功Done完成,或者以包含流失败原因的失败完成。


推荐阅读