首页 > 解决方案 > 什么是最佳火花:联合然后加入或加入然后联合?

问题描述

给定三个不同的数据帧df1df2,它们具有相同的模式,并且df3。这三个数据帧有一个共同的字段。

还要考虑一下df1df2每条记录大约有 4200 万条记录,df3大约有 10 万条记录。

什么是最佳火花:

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


老实说,对于这些卷,这并不重要。

查看这两种方法的 .explain() 并没有太多内容。

Abroadcast join在这两种情况下都很明显。另外union不会导致 a shuffle,至少您的问题并不暗示,即由于可能导致该问题的转换。

也就是说,性能是/应该是相等的。见下文,模拟 DF 方法,但演示了所讨论的要点。数学上没有太多的决定。

方法一

import org.apache.spark.sql.functions.{sha1, rand, col}

val randomDF1 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF2 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF3 = (spark.range(1, 100000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val u = randomDF1.union(randomDF2) 
 val u2 = u.join(randomDF3, "id").explain()

== Physical Plan ==
*(4) Project [id#25284L, hash#25296, hash#25326]
+- *(4) BroadcastHashJoin [id#25284L], [id#25314L], Inner, BuildRight
   :- Union
   :  :- *(1) Project [id#25284L, sha1(cast(random_value#25286 as binary)) AS hash#25296]
   :  :  +- *(1) Project [id#25284L, cast(rand(10) as string) AS random_value#25286]
   :  :     +- *(1) Range (1, 42000000, step=1, splits=2)
   :  +- *(2) Project [id#25299L, sha1(cast(random_value#25301 as binary)) AS hash#25311]
   :     +- *(2) Project [id#25299L, cast(rand(10) as string) AS random_value#25301]
   :        +- *(2) Range (1, 42000000, step=1, splits=2)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13264]
   +- *(3) Project [id#25314L, sha1(cast(random_value#25316 as binary)) AS hash#25326]
      +- *(3) Project [id#25314L, cast(rand(10) as string) AS random_value#25316]
         +- *(3) Range (1, 100000, step=1, splits=2)

方法二

import org.apache.spark.sql.functions.{sha1, rand, col}

val randomDF1 = (spark.range(1, 42000000)
  .withColumn("random_value", rand(seed=10).cast("string"))
  .withColumn("hash", sha1($"random_value"))
  .drop("random_value")
).toDF("id", "hash")

val randomDF2 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF3 = (spark.range(1, 100000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val u1 = randomDF1.join(randomDF3, "id") 
val u2 = randomDF2.join(randomDF3, "id") 
val u3 = u1.union(u2).explain() 

== Physical Plan ==
Union
:- *(2) Project [id#25335L, hash#25347, hash#25377]
:  +- *(2) BroadcastHashJoin [id#25335L], [id#25365L], Inner, BuildRight
:     :- *(2) Project [id#25335L, sha1(cast(random_value#25337 as binary)) AS hash#25347]
:     :  +- *(2) Project [id#25335L, cast(rand(10) as string) AS random_value#25337]
:     :     +- *(2) Range (1, 42000000, step=1, splits=2)
:     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]
:        +- *(1) Project [id#25365L, sha1(cast(random_value#25367 as binary)) AS hash#25377]
:           +- *(1) Project [id#25365L, cast(rand(10) as string) AS random_value#25367]
:              +- *(1) Range (1, 100000, step=1, splits=2)
+- *(4) Project [id#25350L, hash#25362, hash#25377]
   +- *(4) BroadcastHashJoin [id#25350L], [id#25365L], Inner, BuildRight
      :- *(4) Project [id#25350L, sha1(cast(random_value#25352 as binary)) AS hash#25362]
      :  +- *(4) Project [id#25350L, cast(rand(10) as string) AS random_value#25352]
      :     +- *(4) Range (1, 42000000, step=1, splits=2)
  +- ReusedExchange [id#25365L, hash#25377], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]

推荐阅读