scala - 在 Scala 中解析大型 CSV 文件
问题描述
我有一个 CSV 文件(~1gb,1000 万行)。我想分批解析它,然后将输出写入新的文本/CSV 文件,其中每一行都是一个 JSON 数组。
val outputFile = new File(outputFileName)
val bw = new BufferedWriter(new FileWriter(file))
Source
.fromFile(fileName)
.getLines()
.grouped(batchSize)
.foreach(chunk => {
val jsonArray = doChunkTransformation(chunk)
bw.write(jsonArray)
})
bw.close()
这种方法有效吗?或者也许我应该用 包裹它Future
?我对 Scala 很陌生,所以也许我不知道所有的方法和解决方案?
重要说明
不幸的是,我的能力有限,不能使用任何外部库,因此必须用纯 Scala 编写解决方案。
解决方案
我能想到的唯一优化 - 并行doChunkTransformation
执行,当然如果它是相对昂贵的操作,否则它没有意义,因为 IO 会花费更多。例如:
implicit val ec = scala.concurrent.ExecutionContext.global
val paralelism = Runtime.getRuntime().availableProcessors() * 2
val outputFile = new File(outputFileName)
val bw = new BufferedWriter(new FileWriter(file))
val timeout =
Source
.fromFile(fileName)
.getLines()
.grouped(batchSize * paralelism)
.foreach(chunk => {
//Run computations in paralel.
// Note parallelism level depends on exact `ExecutionContext` implementation
// In this example it will be equal to amount of processors multiplied by 2
// Future.traverse preserve order
val computations = Future.traverse(chunk.grouped(paralelism)) (smallChunk => Future(doChunkTransformation(smallChunk)))
val chunks = Await.result(computations).map(_.flatten), 1 minute)
bw.write(jsonArray)
})
bw.close()
UPD:我没有Future.traverse
在文档中找到关于保留原始顺序的任何提及:https ://www.scala-lang.org/api/current/scala/concurrent/Future.html
但我已经准备了显示顺序仍然存在的示例同样:https ://scastie.scala-lang.org/DUvwX1CXTl2kxrcez3uDgw
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
import scala.language.postfixOps
val random = new Random()
val computes = Future.traverse((1 to 10).toList) { num: Int =>
val delay = random.nextInt(1000)
println(s"Delay is $delay")
Thread.sleep(delay)
Future(num.toString)
}
println(Await.result(computes, 1 minute))
最后打印出来:List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
推荐阅读
- ansible - 如何在没有 IP 地址的情况下使用 Ansible 更新客户端机器
- excel - 如何将电子邮件从子文件夹导出到 Excel
- apache-spark - 我可以使用 Airflow 启动/停止火花流作业吗
- blockchain - Restrict function access ONLY to one other specific contract in Solidity
- android - WalletConnect 是否支持“币安智能链(BEP20)”钱包?
- python - How get name of Toplevel window of another Toplevel?
- react-native - 尝试在本机反应中仅显示堆栈导航标题时关闭抽屉标题
- c# - 将所有值从列表添加到列表列表
- php - 2 周内未订购的 woocommerce 用户 sql 查询
- snowflake-cloud-data-platform - 检索覆盖视图 SnowFlake