首页 > 解决方案 > Spark Dataframe 上的 val 与 def 性能

问题描述

以下代码,因此是一个关于性能的问题 - 当然可以大规模想象:

import org.apache.spark.sql.types.StructType

val df = sc.parallelize(Seq(
   ("r1", 1, 1),
   ("r2", 6, 4),
   ("r3", 4, 1),
   ("r4", 1, 2)
   )).toDF("ID", "a", "b")

val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

// or

def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

df.withColumn("ones", ones).explain

下面是使用 def 和 val 时的两个物理计划 - 它们是相同的:

 == Physical Plan == **def**
 *(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#762 = 1) THEN 1 ELSE 0 END) AS ones#770]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#760, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#761, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#762]
   +- Scan[obj#759]


 == Physical Plan == **val**
 *(1) Project [_1#780 AS ID#784, _2#781 AS a#785, _3#782 AS b#786, (CASE WHEN (_2#781 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#782 = 1) THEN 1 ELSE 0 END) AS ones#790]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#780, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#781, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#782]
    +- Scan[obj#779] 

于是,就有了以下讨论:

val 与 def 性能。

然后:

因此,我向 -1er 提出了这样的问题,因为以下内容非常清楚,但 val 的内容比下面的代码更多,并且下面的代码没有被迭代:

var x = 2 // using var as I need to change it to 3 later
val sq = x*x // evaluates right now
x = 3 // no effect! sq is already evaluated
println(sq)

标签: scalaapache-spark

解决方案


这里有两个核心概念,Spark DAG 创建和评估,以及 Scala 的valvsdef定义,这些是正交的

我看不出 .explains 有什么不同

您看不出有什么不同,因为从 Spark 的角度来看,查询是相同的。如果您将图形存储在 a 中val或每次都使用def.

从其他地方: val 在定义时评估, def - 在调用时评估。

这是 Scala 语义。Aval是一个不可变的引用,它在声明站点被评估一次。Adef代表方法定义,如果你DataFrame在里面分配一个新的,每次调用它都会创建一个。例如:

def ones = 
  df
   .schema
   .map(c => c.name)
   .drop(1)
   .map(x => when(col(x) === 1, 1).otherwise(0))
   .reduce(_ + _)

val firstcall = ones
val secondCall = ones

上面的代码将在 DF 上构建两个单独的 DAG。

我假设这里使用 val 或 def 没有区别,因为它本质上是在一个循环中并且有一个 reduce。它是否正确?

我不确定您在谈论哪个循环,但是请参阅上面的答案以了解两者之间的区别。

df.schema.map(c => c.name).drop(1) 会在每个数据帧行中执行吗?当然没有必要。Catalyst 会对此进行优化吗?

不,drop(1)会发生在整个数据帧上,这实际上会使其仅删除第一行。

如果上述情况是正确的,即每次都执行语句以处理要处理的列,那么我们如何使那段代码只出现一次?我们是否应该制作一个 val one = df.schema.map(c => c.name).drop(1)

每个数据帧只发生一次(在您的示例中,我们恰好有一个)。


推荐阅读