apache-spark - emr 上的 pyspark 使用自动广播(即使已禁用)和用于简单 sql 查询的嵌套连接
问题描述
使用 sqlContext.sql 查询在 EMR 上运行 pyspark 代码。其中一个查询导致引发 driver.maxResultSize 相关错误。尝试对查询产生的数据框使用解释以了解原因。在那里,我看到 spark 出于某种原因使用带有嵌套连接的广播(没有明确的说明)。我想了解:
1)为什么spark使用广播和嵌套连接来执行这个查询?
2)为什么广播要经过驱动程序?
3)我如何重写我的代码,使火花不会使用广播(因为广播,或者它通过驱动程序,似乎是问题的根源)?
导致问题的查询:
df1.createOrReplaceTempView("temp_df_sql_view1")
df2.createOrReplaceTempView("temp_df_sql_view2")
# Get values from df1 that exist only in df1
df = sqlContext.sql("""SELECT * FROM temp_df_sql_view1 WHERE id NOT IN (SELECT id FROM temp_df_sql_view2)""")
df.explain()
我得到的错误消息是:Total size of serialized results of 79 tasks (2.1 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
即使 driver.maxResultSize 曾经是 1g,但为了修复错误而被放大。然而,结果的总规模似乎随之扩大。
在意识到这可能是一个广播问题后,我禁用了 autoBroadcast:
conf = SparkConf()
# This should've disabled auto-broadcast
conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
但是在 df 上使用 explain() 仍然显示相同的以下计划(包括广播):
BroadcastNestedLoopJoin BuildLeft, LeftAnti, ((id#22 = id#19) || isnull((id#22 = id#19)))
:- BroadcastExchange IdentityBroadcastMode
: +- *(1) FileScan parquet [id#22,data#23] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://bucket], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...
+- Generate explode(id#2), false, [id#19]
+- *(2) Scan ElasticsearchRelation(Map(...,org.apache.spark.sql.SQLContext@6caa1e7e,None) [id#2] PushedFilters: [], ReadSchema: struct<id:array<string>>
None
解决方案
df = sqlContext.sql("""SELECT * FROM temp_df_sql_view1 WHERE id NOT IN (SELECT id FROM temp_df_sql_view2)""")
df.explain(true)
== Optimized Logical Plan ==
Join LeftAnti, ((id#134 = id#139) || isnull((id#134 = id#139)))
:- <left side>
+- <right side>
如果您看到此查询的优化计划,您将看到 Spark 已将not in
查询转换为 LeftAnti Join。
为了将优化的逻辑计划转换为物理计划,Spark 使用了一些strategy
. 对于联接,Spark 使用JoinSelection
.
它的工作方式记录在这里 - https://github.com/apache/spark/blob/aefb2e7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L326
// 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
// side is broadcast-able and it's left join, or only right side is broadcast-able and
// it's right join, we skip this rule. If both sides are small, broadcasts the smaller
// side for inner and full joins, broadcasts the left side for right join, and broadcasts
// right side for left join.
// 2. Pick cartesian product if join type is inner like.
// 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
// left side for right join, and broadcasts right side for left join.
正如第 3 点所述,它回退到广播连接(甚至认为广播提示不存在并且 tableSize > broadcastThreshold)。
推荐阅读
- port - FTPS白名单端口列表
- wordpress - 在 wordpress/woosommerce 中隐藏某些产品类别
- android - 即使文本大小发生变化,Android如何保持相同的textView高度?
- python - 生成嵌套列表的树库
- vue.js - 移动了对象内部的一些属性,相关事件不再触发
- php - 控制台命令内的Laravel光标分页循环
- c# - 在 acumatica 中,如何获取选择器的值并使用它来填充另一个字段?
- sql - 如何从列中获取特定的部分字符串
- reactjs - 如何在 React(前)和 laravel(后)中使用 SmartCard 实现 2FA
- sql - 如何使用 SQLAlchemy 执行 SQL 查询,以便稍后将其传递到 pandas 数据框