scala - Scala:使用(并行)方法加快文件的字数统计
问题描述
我需要在 scala 中对多达一百万行的输入文件运行字数统计。每行也很长(> 150K 个字符)。以下是有效的标准程序:
val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\\W+"))
.foldLeft(Map.empty[String, Int]){
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
以下修改失败并出现错误,value par is not a member of Iterator[String]
val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\\W+"))
.par
.foldLeft(Map.empty[String, Int]){
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
我对此感到惊讶,因为类似的程序似乎有效。
此外,我想知道是否par.reduce
会比工作更快、更高效par.foldLeft
。
在此问题上提供任何帮助或线索将不胜感激。
TIA
解决方案
看看这个。并行集合从 2.13 中消失了,尽管有一个可以使用的外部库。不过,我说,如果您希望并行处理大量数据,只需使用 spark (特别是,如果您无论如何都需要外部库(您可以在单个节点上运行 spark ......当你说,你需要这个“用于测试”的解决方案,你想测试一个解决方案,然后运行一个完全不同的解决方案,这感觉很奇怪)。
这是一个没有外部库的解决方案(只是为了完整性):
// First, create a local execution context to allow throttling the parallel jobs
val parallelism = 4 // how many chunks to process in parallel
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(
parallelism, parallelism, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[Runnable](parallelism) {
override def offer(e: Runnable) = {
put(e)
true
}
}
))
// Now just split input into chunks and send to the executor
// This does not read anything into memory yet
val chunkSize = 4096 // how many lines to process at once
val jobs = source
.getLines
.grouped(chunkSize)
.map { chunk =>
Future {
chunk
.flatMap { _.split("""\W+""") }
.foldLeft(Map.empty[String, Int]) { case (m, w) =>
m + (w -> (m.getOrElse(w, 0) + 1))
}
}
}
// Now, combine the results.
// This will fetch `parallelism*chunkSize` lines into memory and start
// parallelism jobs processing the chunks. Once one of the jobs completes,
// it will read next `chunkSize` lines, and start another job. Etc.
val result: Future[Map[String, Int]] = Future.sequence(jobs.toSeq).map {
_.reduce { (m1, m2) =>
m1.foldLeft(m2){ case (m, (w, v)) => m + (w -> (m.getOrElse(w,0) + v)) }
}
}
实现这一点的关键是ec
限制当前“正在运行”的期货数量的实施。您可以将它和分块逻辑包装到一个小型实用程序类中,并根据需要使其可重用。不过,如果我是你,我仍然会使用 spark。
推荐阅读
- c - *s++ 中发生了什么?
- sql - Postgres 数组列不小心变成了文本,如何将其转换回文本数组?
- java - 当第一个实体是具有指定主键的实体之后的以下实体时,如何使用 JPA 获取列表
- nginx - Openshift 中的 NGINX - NGINX 无法解析内部主机名
- android - 可点击的父母和孩子
- python - 'ValueError:无法将输入数组从形状(5,5)广播到形状(5)'用于 scipy.optimize-minimize
- vb.net - Visual Basic 计算器
- compiler-errors - gfortran:错误:-fPIC -ffree-line-length-0:没有这样的文件或目录
- matlab - 图像中超像素的相邻和非相邻超像素
- c# - 在任务期间使面板不可见