首页 > 解决方案 > emr 上的 pyspark 使用自动广播(即使已禁用)和用于简单 sql 查询的嵌套连接

问题描述

使用 sqlContext.sql 查询在 EMR 上运行 pyspark 代码。其中一个查询导致引发 driver.maxResultSize 相关错误。尝试对查询产生的数据框使用解释以了解原因。在那里,我看到 spark 出于某种原因使用带有嵌套连接的广播(没有明确的说明)。我想了解:

1)为什么spark使用广播和嵌套连接来执行这个查询?

2)为什么广播要经过驱动程序?

3)我如何重写我的代码,使火花不会使用广播(因为广播,或者它通过驱动程序,似乎是问题的根源)?

导致问题的查询:

df1.createOrReplaceTempView("temp_df_sql_view1")
df2.createOrReplaceTempView("temp_df_sql_view2")
# Get values from df1 that exist only in df1
df = sqlContext.sql("""SELECT * FROM temp_df_sql_view1 WHERE id NOT IN (SELECT id FROM temp_df_sql_view2)""")
df.explain()

我得到的错误消息是:Total size of serialized results of 79 tasks (2.1 GB) is bigger than spark.driver.maxResultSize (2.0 GB)即使 driver.maxResultSize 曾经是 1g,但为了修复错误而被放大。然而,结果的总规模似乎随之扩大。

在意识到这可能是一个广播问题后,我禁用了 autoBroadcast:

conf = SparkConf()
# This should've disabled auto-broadcast
conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)

但是在 df 上使用 explain() 仍然显示相同的以下计划(包括广播):

BroadcastNestedLoopJoin BuildLeft, LeftAnti, ((id#22 = id#19) || isnull((id#22 = id#19))) :- BroadcastExchange IdentityBroadcastMode : +- *(1) FileScan parquet [id#22,data#23] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://bucket], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<... +- Generate explode(id#2), false, [id#19] +- *(2) Scan ElasticsearchRelation(Map(...,org.apache.spark.sql.SQLContext@6caa1e7e,None) [id#2] PushedFilters: [], ReadSchema: struct<id:array<string>> None

标签: apache-sparkpysparkamazon-emrbroadcast

解决方案


df = sqlContext.sql("""SELECT * FROM temp_df_sql_view1 WHERE id NOT IN (SELECT id FROM temp_df_sql_view2)""")
df.explain(true)

== Optimized Logical Plan ==
Join LeftAnti, ((id#134 = id#139) || isnull((id#134 = id#139)))
:- <left side>
+- <right side>

如果您看到此查询的优化计划,您将看到 Spark 已将not in查询转换为 LeftAnti Join。

为了将优化的逻辑计划转换为物理计划,Spark 使用了一些strategy. 对于联接,Spark 使用JoinSelection.

它的工作方式记录在这里 - https://github.com/apache/spark/blob/aefb2e7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L326

      //   1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
      //      side is broadcast-able and it's left join, or only right side is broadcast-able and
      //      it's right join, we skip this rule. If both sides are small, broadcasts the smaller
      //      side for inner and full joins, broadcasts the left side for right join, and broadcasts
      //      right side for left join.
      //   2. Pick cartesian product if join type is inner like.
      //   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
      //      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
      //      left side for right join, and broadcasts right side for left join.

正如第 3 点所述,它回退到广播连接(甚至认为广播提示不存在并且 tableSize > broadcastThreshold)。


推荐阅读