首页 > 解决方案 > 广播加入 pyspark

问题描述

我正在加入两个数据帧df1(15k 行)和df2(600 万行)。我已经广播df1和赔偿df2到20。

并且spark.sql.shuffle.partition默认设置为 200。现在,当加入这两个 DF 时,数据再次被打乱,我可以看到生成了 200 个任务,其中只有 4 个分区有数据。

理想情况下,因为我已经广播了小型 DF,所以不涉及洗牌。但是仍然 join 会导致 200 个记录不均匀的任务。我希望它不应该重新分区数据,因为我已经广播了表。正如我在解释计划中看到的那样,调用了 SortMergeJoin。

def HC_LE_BM(BM, HC_LE):
spark.conf.set('spark.sql.join.preferSortMergeJoin','False')
print(spark.conf.get('spark.sql.join.preferSortMergeJoin'))
df1 = HC_LE
df2 = BM
df2 = df2.repartition(20).cache()
print("Print partition", df2.rdd.getNumPartitions())
print("Storage level", df2.storageLevel)
BENCH_HEIGHT_OFFSET = 25
df3 = df2.join(F.broadcast(df1), (
                    (df1.SRC_DIG_X >= df2.min_x) &
                    (df1.SRC_DIG_X < df2.max_x) &
                    (df1.SRC_DIG_Y >= df2.min_y) &
                    (df1.SRC_DIG_Y < df2.max_y) &
                    (df1.SRC_DIG_Z <= df2.min_z) &
                    ((df1.SRC_DIG_Z + BENCH_HEIGHT_OFFSET) >= df2.max_z) &
                    (df1.ORIGIN == df2.ORIGIN_BM)
                    ),
                how = 'right'
                )\
            .select('*')
df3.explain(True)
df3.show(truncate=False)

请查找附件日志。

== 物理计划 == SortMergeJoin [ORIGIN_BM#6341], [ORIGIN#6395], RightOuter, ((((((SRC_DIG_X#6519 >= min_x#6360) && (SRC_DIG_X#6519 < max_x#6361)) && (SRC_DIG_Y #6520 >= min_y#6362)) && (SRC_DIG_Y#6520 < max_y#6363)) && (SRC_DIG_Z#6521 <= min_z#6364)) && ((SRC_DIG_Z#6521 + 25.0) >= max_z#6365)) :- *(2) 排序 [ORIGIN_BM#6341 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ORIGIN_BM#6341, 200), true : +- *(1) Filter ((((((isnotnull(min_z#6364 ) && isnotnull(max_y#6363)) && isnotnull(max_x#6361)) && isnotnull(max_z#6365)) && isnotnull(min_y#6362)) && isnotnull(min_x#6360)) && isnotnull(ORIGIN_BM#6341)) : +- InMemoryTableScan [ORIGIN_BM#6341, ORIGIN_LOCATION_BENCH_BM#6342, block_id#6343, centroid_x#6344, centroid_y#6345,centroid_z#6346, dim_x#6347, dim_y#6348, dim_z#6349, volume#6350, rktype_now#6351, rkzone_now#6352, densnow_2013#6353, category_dil#6354, class_dil#6355, btot_dil#6356, as_dil#6357, densnow# 6358, densnow_rma#6359, min_x#6360, max_x#6361, min_y#6362, max_y#6363, min_z#6364, max_z#6365], [isnotnull(min_z#6364), isnotnull(max_y#6363), isnotnull(max_x# 6361), isnotnull(max_z#6365), isnotnull(min_y#6362), isnotnull(min_x#6360), isnotnull(ORIGIN_BM#6341)] : +- InMemoryRelation [ORIGIN_BM#6341, ORIGIN_LOCATION_BENCH_BM#6342, block_id#6343, centroid_x# 6344, centroid_y#6345, centroid_z#6346, dim_x#6347, dim_y#6348, dim_z#6349, volume#6350, rktype_now#6351, rkzone_now#6352, densnow_2013#6353, category_dil#6354, class_dil#6355, btot_dil#63 as_dil#6357, densnow_2015#6358, densnow_rma#6359, min_x#6360, max_x#6361, min_y#6362, max_y#6363,min_z#6364, max_z#6365], StorageLevel(disk, memory, deserialized, 1 replicas) : +- Exchange RoundRobinPartitioning(20), false : +-(1) FileScan parquet !ri.foundry.main.transaction.00000000-e6aa-5004-bd4a-ac6c2019674d:ri.foundry.main.transaction.00000000-e6aa-5004-bd4a-ac6c2019674d@00000000-4c09-7aa9-9c6c- 752a1b9849cd:master.ri.foundry.main.dataset.667a9c8b-dde2-4524-acae-7f6c4183141e[ORIGIN_BM#6341,ORIGIN_LOCATION_BENCH_BM#6342,block_id#6343,centroid_x#6344,centroid_y#6345,centroid_d4#67_346 dim_y#6348,dim_z#6349,volume#6350,rktype_now#6351,rkzone_now#6352,densnow_2013#6353,category_dil#6354,class_dil#6355,btot_dil#6356,as_dil#6357,densnow_2015#6358,#min_x#6 6360,max_x#6361,min_y#6362,max_y#6363,min_z#6364,max_z#6365] 批处理:true,DataFilters:[],格式:Parquet,位置:FoundryCatalogFileIndex[sparkfoundry://.com/datasets/ri.foundry.main.datase...,PartitionFilters:[],PushedFilters:[],ReadSchema:struct.com/datasets/ri.foundry.main.datase...,PartitionFilters:[], PushedFilters:[],ReadSchema:结构

标签: pythonapache-sparkpysparkpyspark-sqlhadoop2

解决方案


推荐阅读