首页 > 解决方案 > Keep组合的意义?

问题描述

我正在尝试Keep在 akka 流中组合并创建以下示例:

import java.nio.file.Paths

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.util.{Failure, Success}

object FileConsumer extends App {

  implicit val system = ActorSystem("reactive-tweets")
  implicit val materializer = ActorMaterializer()

  val source: Source[Int, NotUsed] = Source(1 to 100)
  val factorials = source.scan(BigInt(1))((acc, next) => acc * next)

  val result: Future[IOResult] =
    factorials.map(_.toString).runWith(lineSink("factorial2.txt"))

  implicit val ec = system.dispatcher
  result.onComplete {
    case Success(v) => println(s"Fileinfo ${ v.count }")
    case Failure(e) => println(e)
  }

  def lineSink(filename: String): Sink[String, Future[IOResult]] =
    Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)


} 

akka 流网站上它说:

生成的蓝图是 a Sink[String, Future[IOResult]],这意味着它接受字符串作为其输入,并且在具体化时它将创建类型的辅助信息Future[IOResult](当在 Source 或 Flow 上链接操作时,辅助信息的类型 - 称为“具体化值”)最左边的起点;因为我们想保留FileIO.toPath水槽所提供的东西,所以我们需要说Keep.right)。

但是,当我想保留ByteString左侧时,我尝试过:

  def lineSink2(filename: String): Sink[String, Future[ByteString]] =
Flow[String].map(s => ByteString(s + "\n")).toMat(Sink.foreach(println))(Keep.left)

但它根本不编译。

我也不明白:

由最左边的起点给出

最左边的起点是Flow

我想,我还不明白这个想法Keep

标签: scalaakkaakka-stream

解决方案


Sink.foreach 的定义如下:

def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]

这意味着物化价值是 Future[Done]

在流量的情况下,您有:

 val value: Flow[String, ByteString, NotUsed] = Flow[String].map(s => ByteString(s + "\n"))

它的物化值是 NotUsed

在这种情况下:

Keep.left - NotUsed - 源或流的物化值

Keep.right - Future[Done] - Sink 的物化价值

Keep.both -(未使用,未来[完成])

重要的事实是,在许多情况下,物化价值不是流经流的元素的价值,而是

  • 诊断信息
  • 流状态
  • 有关流的其他类型的信息

推荐阅读