scala - Keep组合的意义?
问题描述
我正在尝试Keep
在 akka 流中组合并创建以下示例:
import java.nio.file.Paths
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
import scala.util.{Failure, Success}
object FileConsumer extends App {
implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()
val source: Source[Int, NotUsed] = Source(1 to 100)
val factorials = source.scan(BigInt(1))((acc, next) => acc * next)
val result: Future[IOResult] =
factorials.map(_.toString).runWith(lineSink("factorial2.txt"))
implicit val ec = system.dispatcher
result.onComplete {
case Success(v) => println(s"Fileinfo ${ v.count }")
case Failure(e) => println(e)
}
def lineSink(filename: String): Sink[String, Future[IOResult]] =
Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
}
在akka 流网站上它说:
生成的蓝图是 a
Sink[String, Future[IOResult]]
,这意味着它接受字符串作为其输入,并且在具体化时它将创建类型的辅助信息Future[IOResult]
(当在 Source 或 Flow 上链接操作时,辅助信息的类型 - 称为“具体化值”)最左边的起点;因为我们想保留FileIO.toPath
水槽所提供的东西,所以我们需要说Keep.right
)。
但是,当我想保留ByteString
左侧时,我尝试过:
def lineSink2(filename: String): Sink[String, Future[ByteString]] =
Flow[String].map(s => ByteString(s + "\n")).toMat(Sink.foreach(println))(Keep.left)
但它根本不编译。
我也不明白:
由最左边的起点给出
最左边的起点是Flow
?
我想,我还不明白这个想法Keep
。
解决方案
Sink.foreach 的定义如下:
def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
这意味着物化价值是 Future[Done]
在流量的情况下,您有:
val value: Flow[String, ByteString, NotUsed] = Flow[String].map(s => ByteString(s + "\n"))
它的物化值是 NotUsed
在这种情况下:
Keep.left - NotUsed - 源或流的物化值
Keep.right - Future[Done] - Sink 的物化价值
Keep.both -(未使用,未来[完成])
重要的事实是,在许多情况下,物化价值不是流经流的元素的价值,而是
- 诊断信息
- 流状态
- 有关流的其他类型的信息
推荐阅读
- python - 无法在 django 中使用 facebook 登录
- python - rabbitmq, python - ack 消费者程序示例
- python - 如何使用 pandas 进行条件聚合
- selenium - 使用 WebDriver 运行代码接收时出现错误“[PHPUnit\Framework\Exception] Undefined index: ELEMENT”
- javascript - 在每个渲染上重新添加 React hooks 事件侦听器(exhaustive-deps 错误)
- python - 如何计算标签的使用次数并在 html 中显示?
- c# - 如何使用 c# 遍历文件和活动目录权限?
- javascript - 如何在外部点击时关闭 div 部分?
- java - Hibernate:通过从某个位置读取 hbm 映射文件来创建动态表
- mysql - mysqldump 用于旧版本的 mysql,从 mysql 8 到 mysql 5.7