首页 > 解决方案 > Akka 流未处理我的 CSV 文件中的所有行

问题描述

我正在使用 Akka 流来处理包含 1839 行的 CSV 文件。我添加了计数器来计算处理的行数。

这是我的来源,我确保输入文件中的每一行少于 700 个字符。

case class ParsedLine(input: String, field1: String, field2: String, field3: String)


val counter0 = new AtomicInteger()
val counter1 = new AtomicInteger()

val lineSource = FileIO
    .fromPath(Paths.get(InputFile))
    .via(Framing.delimiter(ByteString("\n"), 1024, allowTruncation = true))
    .map { l =>
      counter0.incrementAndGet()
      l.utf8String
    }

 val parseLine = Flow[String].map { l =>
     val words = l.split(",")
     ParsedLine(l, words(0), words(1), words(2))
 }

这个源的处理如下,对应源中的每一行,输出中应该有一个处理过的行。

val done = lineSource
    .via(parseLine)
    .to(Sink.foreach(_.input))
    .run()


  done.onComplete {
    case Success(_) =>
      println("Counter0 " + counter0.get())
      println("Counter1 " + counter1.get())
      system.terminate()
    case Failure(e) =>
      println(e.getLocalizedMessage)
      system.terminate()
  }

有趣的是计数器打印如下,每次我得到不同的数字。如果我删除该.to(Sink.foreach(_.input))行,我得到的计数为 1839。

Counter0 1445
Counter1 1667

首先,我希望 Counter0 的值高于 Counter1,因为 Counter0 位于 Counter1 之前的阶段,我希望所有行都被处理并且计数器应该打印总行数 1839。

知道在这种情况下发生了什么吗?akka 流是否会在两者之间丢弃物品?

标签: scalacsvakkaakka-streamthrottling

解决方案


您实际上并没有等待流完成。

您正在附加Sink.foreach(...)使用to删除阶段的处理细节Sink.foreach并仅保留早期阶段的处理阶段的使用。

另外,请记住,您在每一步(viamapvia然后to)都在做同样的事情。因此,您只需要跟踪由FileIO.from(...). 这意味着您只是在等待读取完整文件,而不是等待任何后续处理步骤。

您只需要保留两者的结果并等待它们完成。

val stream =
 lineSource
   .via(parseLine)
   .toMat(Sink.foreach(_.input))(Keep.both)

val resultFutures: (Future[IOResult], Future[Done]) = stream.run()

val resultsFuture = Future.sequence(List(resultFutures._1, resultFutures._2))

resultsFuture.onComplete {
  case Success(List(ioResult, done)) =>
    println(ioResult)
    println(done)
    println(counter0.get())
    actorSystem.terminate()
  case Failure(e) =>
    println(e.getLocalizedMessage)
    actorSystem.terminate()
}

或者,您可以选择仅跟踪最后一个处理阶段(Sink.foreach(...)在这种情况下)

val stream =
  lineSource
    .via(parseLine)
    .toMat(Sink.foreach(_.input))(Keep.right)

val resuleFuture: Future[Done] = stream.run()

resuleFuture.onComplete({
  case Success(_) =>
    println("Counter0 " + counter0.get())
    actorSystem.terminate()
  case Failure(e) =>
    println(e.getLocalizedMessage)
    actorSystem.terminate()
})

推荐阅读