scala - Akka 流未处理我的 CSV 文件中的所有行
问题描述
我正在使用 Akka 流来处理包含 1839 行的 CSV 文件。我添加了计数器来计算处理的行数。
这是我的来源,我确保输入文件中的每一行少于 700 个字符。
case class ParsedLine(input: String, field1: String, field2: String, field3: String)
val counter0 = new AtomicInteger()
val counter1 = new AtomicInteger()
val lineSource = FileIO
.fromPath(Paths.get(InputFile))
.via(Framing.delimiter(ByteString("\n"), 1024, allowTruncation = true))
.map { l =>
counter0.incrementAndGet()
l.utf8String
}
val parseLine = Flow[String].map { l =>
val words = l.split(",")
ParsedLine(l, words(0), words(1), words(2))
}
这个源的处理如下,对应源中的每一行,输出中应该有一个处理过的行。
val done = lineSource
.via(parseLine)
.to(Sink.foreach(_.input))
.run()
done.onComplete {
case Success(_) =>
println("Counter0 " + counter0.get())
println("Counter1 " + counter1.get())
system.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
system.terminate()
}
有趣的是计数器打印如下,每次我得到不同的数字。如果我删除该.to(Sink.foreach(_.input))
行,我得到的计数为 1839。
Counter0 1445
Counter1 1667
首先,我希望 Counter0 的值高于 Counter1,因为 Counter0 位于 Counter1 之前的阶段,我希望所有行都被处理并且计数器应该打印总行数 1839。
知道在这种情况下发生了什么吗?akka 流是否会在两者之间丢弃物品?
解决方案
您实际上并没有等待流完成。
您正在附加Sink.foreach(...)
使用to
删除阶段的处理细节Sink.foreach
并仅保留早期阶段的处理阶段的使用。
另外,请记住,您在每一步(via
、map
、via
然后to
)都在做同样的事情。因此,您只需要跟踪由FileIO.from(...)
. 这意味着您只是在等待读取完整文件,而不是等待任何后续处理步骤。
您只需要保留两者的结果并等待它们完成。
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.both)
val resultFutures: (Future[IOResult], Future[Done]) = stream.run()
val resultsFuture = Future.sequence(List(resultFutures._1, resultFutures._2))
resultsFuture.onComplete {
case Success(List(ioResult, done)) =>
println(ioResult)
println(done)
println(counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
}
或者,您可以选择仅跟踪最后一个处理阶段(Sink.foreach(...)
在这种情况下)
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.right)
val resuleFuture: Future[Done] = stream.run()
resuleFuture.onComplete({
case Success(_) =>
println("Counter0 " + counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
})
推荐阅读
- c# - 如何在 ASP.NET Core 3 中处理从 400 到 500 的异常
- bash - 如何使用 execv 合并命令?例如。ls | grep -c "表达式"
- java - Java OpenJDK 中缺少 JavaFX
- php - 当我尝试在我的项目 laravel 6.x 上使用 redis 时,它会显示此错误消息
- node.js - 将 nodejs MongoDB 连接代码移动到另一个文件
- javascript - 带有ajax json_encode的codeigniter不起作用
- asynchronous - 不能使用`impl Future`将异步函数存储在向量中
- odoo - 自定义小部件 js 无法识别来自 qweb 的模板
- ruby-on-rails - 安装 Rails 6.0.0 时出错:无法构建 gem 原生扩展
- cron - Quartz Web Restart 后的问题 - Quartz 触发所有已粘贴的计划作业