首页 > 解决方案 > 在 Scala 中解析大型 CSV 文件

问题描述

我有一个 CSV 文件(~1gb,1000 万行)。我想分批解析它,然后将输出写入新的文本/CSV 文件,其中每一行都是一个 JSON 数组。

val outputFile = new File(outputFileName)
val bw = new BufferedWriter(new FileWriter(file))

Source
  .fromFile(fileName)
  .getLines()
  .grouped(batchSize)
  .foreach(chunk => {
    val jsonArray = doChunkTransformation(chunk)

    bw.write(jsonArray)
  })

bw.close()

这种方法有效吗?或者也许我应该用 包裹它Future?我对 Scala 很陌生,所以也许我不知道所有的方法和解决方案?

重要说明
不幸的是,我的能力有限,不能使用任何外部库,因此必须用纯 Scala 编写解决方案。

标签: scala

解决方案


我能想到的唯一优化 - 并行doChunkTransformation执行,当然如果它是相对昂贵的操作,否则它没有意义,因为 IO 会花费更多。例如:

        implicit val ec = scala.concurrent.ExecutionContext.global
        val paralelism = Runtime.getRuntime().availableProcessors() * 2

        val outputFile = new File(outputFileName)
        val bw = new BufferedWriter(new FileWriter(file))
        val timeout = 

        Source
        .fromFile(fileName)
        .getLines()
        .grouped(batchSize * paralelism)
        .foreach(chunk => {
            //Run computations in paralel.
            // Note parallelism level depends on exact `ExecutionContext` implementation 
            // In this example it will be equal to amount of processors multiplied by 2
            // Future.traverse preserve order
            val computations = Future.traverse(chunk.grouped(paralelism)) (smallChunk => Future(doChunkTransformation(smallChunk))) 
            val chunks = Await.result(computations).map(_.flatten), 1 minute)
            bw.write(jsonArray)
        })

        bw.close()

UPD:我没有Future.traverse在文档中找到关于保留原始顺序的任何提及:https ://www.scala-lang.org/api/current/scala/concurrent/Future.html 但我已经准备了显示顺序仍然存在的示例同样:https ://scastie.scala-lang.org/DUvwX1CXTl2kxrcez3uDgw

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random

import scala.language.postfixOps

val random = new Random()
val computes = Future.traverse((1 to 10).toList) { num: Int => 
  val delay = random.nextInt(1000)
  println(s"Delay is $delay")
  Thread.sleep(delay)
  Future(num.toString)
}
println(Await.result(computes, 1 minute))

最后打印出来:List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


推荐阅读