scala - Spark Dataframe 上的 val 与 def 性能
问题描述
以下代码,因此是一个关于性能的问题 - 当然可以大规模想象:
import org.apache.spark.sql.types.StructType
val df = sc.parallelize(Seq(
("r1", 1, 1),
("r2", 6, 4),
("r3", 4, 1),
("r4", 1, 2)
)).toDF("ID", "a", "b")
val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)
// or
def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)
df.withColumn("ones", ones).explain
下面是使用 def 和 val 时的两个物理计划 - 它们是相同的:
== Physical Plan == **def**
*(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#762 = 1) THEN 1 ELSE 0 END) AS ones#770]
+- *(1) SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#760, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#761, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#762]
+- Scan[obj#759]
== Physical Plan == **val**
*(1) Project [_1#780 AS ID#784, _2#781 AS a#785, _3#782 AS b#786, (CASE WHEN (_2#781 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#782 = 1) THEN 1 ELSE 0 END) AS ones#790]
+- *(1) SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#780, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#781, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#782]
+- Scan[obj#779]
于是,就有了以下讨论:
val 与 def 性能。
然后:
我看不出 .explains 有什么不同。好的。
从其他地方: val 在定义时评估, def - 在调用时评估。
- 我假设这里使用 val 或 def 没有区别,因为它本质上是在一个循环中并且有一个 reduce。它是否正确?
- df.schema.map (c => c.name).drop(1)会在每个数据帧行中执行吗?当然没有必要。Catalyst 会对此进行优化吗?
- 如果上述情况是正确的,即每次都执行语句以处理要处理的列,那么我们如何才能使那段代码只出现一次呢?我们是否应该制作一个 val one = df.schema.map(c => c.name).drop(1)
- val, def 不仅仅是 Scala,也是 Spark 组件。
因此,我向 -1er 提出了这样的问题,因为以下内容非常清楚,但 val 的内容比下面的代码更多,并且下面的代码没有被迭代:
var x = 2 // using var as I need to change it to 3 later
val sq = x*x // evaluates right now
x = 3 // no effect! sq is already evaluated
println(sq)
解决方案
这里有两个核心概念,Spark DAG 创建和评估,以及 Scala 的val
vsdef
定义,这些是正交的
我看不出 .explains 有什么不同
您看不出有什么不同,因为从 Spark 的角度来看,查询是相同的。如果您将图形存储在 a 中val
或每次都使用def
.
从其他地方: val 在定义时评估, def - 在调用时评估。
这是 Scala 语义。Aval
是一个不可变的引用,它在声明站点被评估一次。Adef
代表方法定义,如果你DataFrame
在里面分配一个新的,每次调用它都会创建一个。例如:
def ones =
df
.schema
.map(c => c.name)
.drop(1)
.map(x => when(col(x) === 1, 1).otherwise(0))
.reduce(_ + _)
val firstcall = ones
val secondCall = ones
上面的代码将在 DF 上构建两个单独的 DAG。
我假设这里使用 val 或 def 没有区别,因为它本质上是在一个循环中并且有一个 reduce。它是否正确?
我不确定您在谈论哪个循环,但是请参阅上面的答案以了解两者之间的区别。
df.schema.map(c => c.name).drop(1) 会在每个数据帧行中执行吗?当然没有必要。Catalyst 会对此进行优化吗?
不,drop(1)
会发生在整个数据帧上,这实际上会使其仅删除第一行。
如果上述情况是正确的,即每次都执行语句以处理要处理的列,那么我们如何使那段代码只出现一次?我们是否应该制作一个 val one = df.schema.map(c => c.name).drop(1)
每个数据帧只发生一次(在您的示例中,我们恰好有一个)。
推荐阅读
- php - PHP - 替换字符串中的字符
- ios - Firestore 读取旧数据
- runtime-error - Spotify Web API 错误 500
- android - Kotlin 按当前注释类过滤 memberProperties
- angular - 在ionic 3中将音频文件转换为base64
- c++ - 我如何在 C++ 中获取用户输入来完成我的 system() 函数
- javascript - 当您使用 JavaScript 单击按钮时,使我的按钮变黑
- javascript - html / javascript - 跳转到锚点不起作用
- msbuild - 使用新的 Buildsystem 一次构建多个“配置”
- tensorflow - Tensorflow:使用冻结图修改节点操作(图编辑)