pyspark - 在 pyspark 中的 SQL 中广播多个视图
问题描述
我想在加入大表时在多个小表上使用 BROADCAST 提示。在下面的示例中,SMALLTABLE2 在不同的连接列上与 LARGETABLE 多次连接。现在为了获得更好的性能,我希望 SMALLTABLE1 和 SMALLTABLE2 都被广播。这可以通过简单地添加提示 /* BROADCAST (B,C,D,E) */ 来实现,还是有更好的解决方案?SMALLTABLE1 & SMALLTABLE2 我通过查询数据框中的 HIVE 表来获取数据,然后使用 createOrReplaceTempView 创建一个视图作为 SMALLTABLE1 & SMALLTABLE2; 稍后在如下查询中使用。
是否有使用 createOrReplaceTempView 函数创建的 BROADCASTING 视图?
SELECT A.COL1, A.COL2, A.COL3, B.COL4, C.COL5, D.COL6, E.COL7
FROM LARGETABLE A
JOIN SMALLTABLE1 B
ON A.LCOL = B.SCOL
JOIN SMALLTABLE2 C
ON A.LCOL1 = C.SCOL
JOIN SMALLTABLE2 D
ON A.LCOL2 = D.SCOL
JOIN SMALLTABLE2 E
ON A.LCOL3 = E.SCOL
解决方案
如果您使用的是 spark 2.2+,那么您可以使用这些MAPJOIN/BROADCAST/BROADCASTJOIN
提示中的任何一个。
示例:下面我使用了广播,但您可以使用mapjoin/broadcastjoin提示将产生相同的解释计划。
>>> spark.range(1000000000).createOrReplaceTempView("t")
>>> spark.range(1000000000).createOrReplaceTempView("u")
>>>sql("select /*+ Broadcast(t,u) */* from t join u on t.id=u.id").explain()
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#16L], Inner, BuildRight
:- *Range (0, 1000000000, step=1, splits=56)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1000000000, step=1, splits=56)
(或者)
如果您使用的是 Spark < 2,那么我们需要使用数据帧 API 来持久化,然后注册为临时表,我们可以在内存连接中实现。
>>> df=hc.range(10000000)
>>> df.persist() --persist the df in memory
>>> df.registerTempTable("u") --register temp table
>>> df1=hc.range(10000000)
>>> df1.persist()
>>> df1.registerTempTable("t")
>>> hc.sql("select * from t join u on t.id=u.id").explain()
== Physical Plan ==
Project [id#11L,id#26L]
+- SortMergeJoin [id#11L], [id#26L]
:- Sort [id#11L ASC], false, 0
: +- TungstenExchange hashpartitioning(id#11L,200), None
: +- InMemoryColumnarTableScan [id#11L], InMemoryRelation [id#11L], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
+- Sort [id#26L ASC], false, 0
+- TungstenExchange hashpartitioning(id#26L,200), None
+- InMemoryColumnarTableScan [id#26L], InMemoryRelation [id#26L], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
通过使用 DataFrames 而不创建任何临时表
>>>from pyspark.sql.functions import *
>>> df=hc.range(10000000)
>>> df1=hc.range(10000000)
>>> df.join(broadcast(df1),['id']).explain()
== Physical Plan ==
Project [id#26L]
+- BroadcastHashJoin [id#26L], [id#11L], BuildRight
:- ConvertToUnsafe
: +- Scan ExistingRDD[id#26L]
+- ConvertToUnsafe
+- Scan ExistingRDD[id#11L]
此外,广播连接在 Spark 中自动完成。
有一个参数是“ spark.sql.autoBroadcastJoinThreshold ”,默认设置为10mb。
然后更改默认值
conf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*<mb_value>)
有关更多信息,请参阅有关spark.sql.autoBroadcastJoinThreshold的链接。
推荐阅读
- google-apps-script - 谷歌电子表格的基于时间的触发器
- flutter - 使用 iOS 模拟器测试 plaid_flutter 时没有弹出键盘
- amazon-web-services - Codebuild 中的 Codepipeline S3 存储桶访问被拒绝
- sql - 根据开始/结束选择指定日期之前和之后的连续日期计数
- django - Django没有完全按照表格中输入的方式存储数据
- visual-studio - 如何克服“运行应用程序前请选择有效设备”。在 Windows 10 上与 Mac Visual Studio 配对
- django - 如何在 Django 中获取动态 HTML 表单字段的名称和值?
- python - 如何为来自 uNet 的预测图像设置阈值
- android - 无法找到包含 Intent 在片段中时的配置根
- css - 即使设置为填充可用空间,弹性框中的按钮也有很小的边距