首页 > 解决方案 > Spark 遍历数据框行、单元格

问题描述

(Spark 初学者)我编写了下面的代码来迭代数据框的行和列(Spark 2.4.0 + Scala 2.12)。我已经计算了行数和单元格数作为健全性检查。我惊讶地发现该方法返回0,即使计数器在迭代期间递增。

准确地说:当代码运行时,它会打印显示它已经找到的消息

迭代完成后,它会打印“Found 0 cells”,并返回0

我知道 Spark 是一个分布式处理引擎,并且代码并没有完全按照编写的方式执行 - 但我应该如何看待这段代码?

行/单元格计数只是一个健全性检查;实际上,我需要遍历数据并积累一些结果,但是如何防止 Spark 在迭代完成后立即将结果归零?

  def processDataFrame(df: sql.DataFrame): Int = {
    var numRows = 0
    var numCells = 0
    df.foreach { row =>
      numRows += 1
      if (numRows % 10 == 0) println(s"Found row $numRows") // prints 10,20,...,610
      row.toSeq.foreach { c =>
        if (numCells % 100 == 0) println(s"Found cell $numCells") // prints 100,200,...,15800
        numCells += 1
      }
    }
    println(s"Found $numCells cells") // prints 0
    numCells
  }

标签: scalaapache-spark

解决方案


Spark 具有累加器变量,可为您提供分布式环境中的计数等功能。您可以使用简单的 long 和 int 类型的累加器。甚至累加器的自定义数据类型也可以在 Spark 中轻松实现。
在您的代码中,将计数变量更改为如下所示的累加器变量将为您提供正确的结果。

val numRows = sc.longAccumulator("numRows Accumulator")  // string name only for debug purpose
val numCells = sc.longAccumulator("numCells Accumulator")
df.foreach { row =>
  numRows.add(1)
  if (numRows.value % 10 == 0) println(s"Found row ${numRows.value}") // prints 10,20,...,610
  row.toSeq.foreach { c =>
    if (numCells.value % 100 == 0) println(s"Found cell ${numCells.value}") // prints 100,200,...,15800
    numCells.add(1)
  }
}
println(s"Found ${numCells.value} cells") // prints 0
numCells.value

推荐阅读