scala - Spark 遍历数据框行、单元格
问题描述
(Spark 初学者)我编写了下面的代码来迭代数据框的行和列(Spark 2.4.0 + Scala 2.12)。我已经计算了行数和单元格数作为健全性检查。我惊讶地发现该方法返回0,即使计数器在迭代期间递增。
准确地说:当代码运行时,它会打印显示它已经找到的消息
- 行
10, 20, ..., 610
- 正如预期的那样。 - 细胞
100, 200, ..., 1580
- 正如预期的那样。
迭代完成后,它会打印“Found 0 cells”,并返回0。
我知道 Spark 是一个分布式处理引擎,并且代码并没有完全按照编写的方式执行 - 但我应该如何看待这段代码?
行/单元格计数只是一个健全性检查;实际上,我需要遍历数据并积累一些结果,但是如何防止 Spark 在迭代完成后立即将结果归零?
def processDataFrame(df: sql.DataFrame): Int = {
var numRows = 0
var numCells = 0
df.foreach { row =>
numRows += 1
if (numRows % 10 == 0) println(s"Found row $numRows") // prints 10,20,...,610
row.toSeq.foreach { c =>
if (numCells % 100 == 0) println(s"Found cell $numCells") // prints 100,200,...,15800
numCells += 1
}
}
println(s"Found $numCells cells") // prints 0
numCells
}
解决方案
Spark 具有累加器变量,可为您提供分布式环境中的计数等功能。您可以使用简单的 long 和 int 类型的累加器。甚至累加器的自定义数据类型也可以在 Spark 中轻松实现。
在您的代码中,将计数变量更改为如下所示的累加器变量将为您提供正确的结果。
val numRows = sc.longAccumulator("numRows Accumulator") // string name only for debug purpose
val numCells = sc.longAccumulator("numCells Accumulator")
df.foreach { row =>
numRows.add(1)
if (numRows.value % 10 == 0) println(s"Found row ${numRows.value}") // prints 10,20,...,610
row.toSeq.foreach { c =>
if (numCells.value % 100 == 0) println(s"Found cell ${numCells.value}") // prints 100,200,...,15800
numCells.add(1)
}
}
println(s"Found ${numCells.value} cells") // prints 0
numCells.value
推荐阅读
- java - 在 Java 中获取系统日期时间更新日志
- laravel - Valet 基于权限的问题(看起来如此)
- reactjs - 输入范围滑块作为反应挂钩不滑动
- javascript - 如何检查一个字符串是否包含在js或jquery中的另一个字符串中
- swift - 在表格视图中选择和取消选择行时发出执行功能
- sql - 日期和时间差 (hh:mm:ss) 并根据 where 条件对 Duration 求和
- python - How to remove background gray color when printing PDF using ghostscript?
- mongodb - Mocking official MongoDb driver
- python-3.x - Writing optimized multiple conditional statements Or use any other way to check condition? in python
- amazon-web-services - 如何将 HTTPS 网站的托管转移到 EC2?