python - 广播加入 pyspark
问题描述
我正在加入两个数据帧df1
(15k 行)和df2
(600 万行)。我已经广播df1
和赔偿df2
到20。
并且spark.sql.shuffle.partition
默认设置为 200。现在,当加入这两个 DF 时,数据再次被打乱,我可以看到生成了 200 个任务,其中只有 4 个分区有数据。
理想情况下,因为我已经广播了小型 DF,所以不涉及洗牌。但是仍然 join 会导致 200 个记录不均匀的任务。我希望它不应该重新分区数据,因为我已经广播了表。正如我在解释计划中看到的那样,调用了 SortMergeJoin。
def HC_LE_BM(BM, HC_LE):
spark.conf.set('spark.sql.join.preferSortMergeJoin','False')
print(spark.conf.get('spark.sql.join.preferSortMergeJoin'))
df1 = HC_LE
df2 = BM
df2 = df2.repartition(20).cache()
print("Print partition", df2.rdd.getNumPartitions())
print("Storage level", df2.storageLevel)
BENCH_HEIGHT_OFFSET = 25
df3 = df2.join(F.broadcast(df1), (
(df1.SRC_DIG_X >= df2.min_x) &
(df1.SRC_DIG_X < df2.max_x) &
(df1.SRC_DIG_Y >= df2.min_y) &
(df1.SRC_DIG_Y < df2.max_y) &
(df1.SRC_DIG_Z <= df2.min_z) &
((df1.SRC_DIG_Z + BENCH_HEIGHT_OFFSET) >= df2.max_z) &
(df1.ORIGIN == df2.ORIGIN_BM)
),
how = 'right'
)\
.select('*')
df3.explain(True)
df3.show(truncate=False)
请查找附件日志。
== 物理计划 == SortMergeJoin [ORIGIN_BM#6341], [ORIGIN#6395], RightOuter, ((((((SRC_DIG_X#6519 >= min_x#6360) && (SRC_DIG_X#6519 < max_x#6361)) && (SRC_DIG_Y #6520 >= min_y#6362)) && (SRC_DIG_Y#6520 < max_y#6363)) && (SRC_DIG_Z#6521 <= min_z#6364)) && ((SRC_DIG_Z#6521 + 25.0) >= max_z#6365)) :- *(2) 排序 [ORIGIN_BM#6341 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ORIGIN_BM#6341, 200), true : +- *(1) Filter ((((((isnotnull(min_z#6364 ) && isnotnull(max_y#6363)) && isnotnull(max_x#6361)) && isnotnull(max_z#6365)) && isnotnull(min_y#6362)) && isnotnull(min_x#6360)) && isnotnull(ORIGIN_BM#6341)) : +- InMemoryTableScan [ORIGIN_BM#6341, ORIGIN_LOCATION_BENCH_BM#6342, block_id#6343, centroid_x#6344, centroid_y#6345,centroid_z#6346, dim_x#6347, dim_y#6348, dim_z#6349, volume#6350, rktype_now#6351, rkzone_now#6352, densnow_2013#6353, category_dil#6354, class_dil#6355, btot_dil#6356, as_dil#6357, densnow# 6358, densnow_rma#6359, min_x#6360, max_x#6361, min_y#6362, max_y#6363, min_z#6364, max_z#6365], [isnotnull(min_z#6364), isnotnull(max_y#6363), isnotnull(max_x# 6361), isnotnull(max_z#6365), isnotnull(min_y#6362), isnotnull(min_x#6360), isnotnull(ORIGIN_BM#6341)] : +- InMemoryRelation [ORIGIN_BM#6341, ORIGIN_LOCATION_BENCH_BM#6342, block_id#6343, centroid_x# 6344, centroid_y#6345, centroid_z#6346, dim_x#6347, dim_y#6348, dim_z#6349, volume#6350, rktype_now#6351, rkzone_now#6352, densnow_2013#6353, category_dil#6354, class_dil#6355, btot_dil#63 as_dil#6357, densnow_2015#6358, densnow_rma#6359, min_x#6360, max_x#6361, min_y#6362, max_y#6363,min_z#6364, max_z#6365], StorageLevel(disk, memory, deserialized, 1 replicas) : +- Exchange RoundRobinPartitioning(20), false : +-(1) FileScan parquet !ri.foundry.main.transaction.00000000-e6aa-5004-bd4a-ac6c2019674d:ri.foundry.main.transaction.00000000-e6aa-5004-bd4a-ac6c2019674d@00000000-4c09-7aa9-9c6c- 752a1b9849cd:master.ri.foundry.main.dataset.667a9c8b-dde2-4524-acae-7f6c4183141e[ORIGIN_BM#6341,ORIGIN_LOCATION_BENCH_BM#6342,block_id#6343,centroid_x#6344,centroid_y#6345,centroid_d4#67_346 dim_y#6348,dim_z#6349,volume#6350,rktype_now#6351,rkzone_now#6352,densnow_2013#6353,category_dil#6354,class_dil#6355,btot_dil#6356,as_dil#6357,densnow_2015#6358,#min_x#6 6360,max_x#6361,min_y#6362,max_y#6363,min_z#6364,max_z#6365] 批处理:true,DataFilters:[],格式:Parquet,位置:FoundryCatalogFileIndex[sparkfoundry://.com/datasets/ri.foundry.main.datase...,PartitionFilters:[],PushedFilters:[],ReadSchema:struct.com/datasets/ri.foundry.main.datase...,PartitionFilters:[], PushedFilters:[],ReadSchema:结构
解决方案
推荐阅读
- node.js - 如何使用nodejs代码修改mongodb中数组类型的字段
- uwp - 为多实例 uwp 应用程序创建任务栏徽章通知
- angular - 无法订阅通知错误:此浏览器禁用或不支持服务工作者
- python - 如何在没有邮递员的情况下传递令牌
- c# - 嵌套查询等效于以下 ElasticSearch 查询
- javascript - 应用了 CSS 类,但从 vanilla JS 函数调用时不显示样式
- android - RecyclerView 适配器 + 数据绑定
- postgresql - 将表 postgresql 的一部分移动到另一个数据库
- mysql - SQL 临时表问题
- deno - 如何使用 deno 返回 html 或 json?