首页 > 解决方案 > Scala:使用(并行)方法加快文件的字数统计

问题描述

我需要在 scala 中对多达一百万行的输入文件运行字数统计。每行也很长(> 150K 个字符)。以下是有效的标准程序:

val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\\W+"))
.foldLeft(Map.empty[String, Int]){
    (count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

以下修改失败并出现错误,value par is not a member of Iterator[String]

val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\\W+"))
.par
.foldLeft(Map.empty[String, Int]){
    (count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

我对此感到惊讶,因为类似的程序似乎有效。

此外,我想知道是否par.reduce会比工作更快、更高效par.foldLeft

在此问题上提供任何帮助或线索将不胜感激。

TIA

标签: scalaparallel-processing

解决方案


看看这个。并行集合从 2.13 中消失了,尽管有一个可以使用的外部库。不过,我说,如果您希望并行处理大量数据,只需使用 spark (特别是,如果您无论如何都需要外部库(您可以在单个节点上运行 spark ......当你说,你需要这个“用于测试”的解决方案,你想测试一个解决方案,然后运行一个完全不同的解决方案,这感觉很奇怪)。

这是一个没有外部库的解决方案(只是为了完整性):

    // First, create a local execution context to allow throttling the parallel jobs 
   val parallelism = 4 // how many chunks to process in parallel

   implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(
    parallelism, parallelism, 0L, TimeUnit.SECONDS,
    new ArrayBlockingQueue[Runnable](parallelism) {
      override def offer(e: Runnable) = {
        put(e)
        true
      }
    }
  ))

   // Now just split input into chunks and send to the executor
   // This does not read anything into memory yet
   
   val chunkSize = 4096 // how many lines to process at once

    val jobs = source 
     .getLines
     .grouped(chunkSize)
     .map { chunk => 
       Future {
          chunk
            .flatMap { _.split("""\W+""") }
            .foldLeft(Map.empty[String, Int]) { case (m, w) => 
                m + (w -> (m.getOrElse(w, 0) + 1))
            }
       }
     }

  // Now, combine the results.
  // This will fetch `parallelism*chunkSize` lines into memory and start
  // parallelism jobs processing the chunks. Once one of the jobs completes, 
  // it will read next `chunkSize` lines, and start another job. Etc.
    val result: Future[Map[String, Int]] = Future.sequence(jobs.toSeq).map { 
      _.reduce { (m1, m2) =>
        m1.foldLeft(m2){ case (m, (w, v)) => m + (w -> (m.getOrElse(w,0) + v)) }
      }
  }

实现这一点的关键是ec限制当前“正在运行”的期货数量的实施。您可以将它和分块逻辑包装到一个小型实用程序类中,并根据需要使其可重用。不过,如果我是你,我仍然会使用 spark。


推荐阅读