首页 > 解决方案 > 如果没有匹配,则在键上左连接,然后在不同的右键上连接以获取值

问题描述

我有两个 spark 数据框,比如 df_core 和 df_dict:

df_core 中有更多的 cols 但这与这里的问题无关

df_core:

id
1_ghi
2_mno
3_xyz
4_abc

df_dict:

id_1      id_2      cost
1_ghi     1_ghi     12
2_mno     2_rst     86
3_def     3_xyz     105

我想通过加入 2 个 dfs 从 df_dict.cost 中获取价值。

场景:加入 df_core.id == df_dict.id_1

如果 df_core.id 与外键 df_dict.id_1 不匹配(例如:3_xyz),则连接应该发生在 df_dict.id_2

我能够实现第一个键的连接,但不确定如何实现该场景

final_df = df_core.alias("df_core_alias").join(df_dict, df_core.id== df_dict.id_1, 'left').select('df_core_alias.*', df_dict.cost)

解决方案不必是数据帧操作。如果这很容易和/或优化,我可以从数据帧中创建临时视图,然后在其上运行 SQL。

我还想到了一个 SQL 解决方案(未经测试):

SELECT
    core.id,
    dict.cost
FROM
    df_core core LEFT JOIN df_dict dict
    ON core.id = dict.id_1
    OR core.id = dict.id_2

预期的df:

id       cost
1_ghi    12
2_mno    86
3_xyz    105
4_abc

那么项目计划太大了,无法在评论中添加,所以我必须在这里提问

以下是isin的火花计划:

== Physical Plan ==
*(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
   :- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
   :  +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 =  )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
   :     +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
   +- BroadcastExchange IdentityBroadcastMode
      +- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>

BroadcastNestedLoopJoin中的过滤器来自之前的df_core转换,但我们知道 spark 的惰性评估,我们在项目计划中看到它

此外,我刚刚意识到 final_df.show() 适用于我使用的任何解决方案。但是需要无限时间来处理的是我在 final_df 上进行的下一个转换,这是我实际的expected_df。这是我的下一个转变:

expected_df = spark.sql("select region_type, cost, core_sector_value, count(core_id) from final_df_view group by region_type, cost, core_sector_value order by region_type, cost, core_sector_value")

&这里是expected_df的计划:

== Physical Plan ==
*(5) Sort [region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST, 200)
   +- *(4) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[count(core_id#22)])
      +- Exchange hashpartitioning(region_type#26, cost#13, core_sector_value#21, 200)
         +- *(3) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[partial_count(core_id#22)])
            +- *(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
               +- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
                  :- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
                  :  +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 =  )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
                  :     +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
                  +- BroadcastExchange IdentityBroadcastMode
                     +- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>

看到这个计划,我认为对于 spark local 的内存转换来说太重了。执行这么多不同的步骤转换是最佳实践,还是我应该尝试提出一个包含所有业务逻辑的查询?

此外,您能否直接访问任何资源以了解我们使用 explain() 函数获得的 Spark 计划?谢谢

标签: apache-sparkpysparkapache-spark-sql

解决方案


看起来像in left_outer操作:

# Final DF will have all columns from df1 and df2
final_df = df1.join(df2, df1.id.isin(df2.id_1, df2.id_2), 'left_outer')
final_df.show()

+-----+-----+-----+----+
|   id| id_1| id_2|cost|
+-----+-----+-----+----+
|1_ghi|1_ghi|1_ghi|  12|
|2_mno|2_mno|2_rst|  86|
|3_xyz|3_def|3_xyz| 105|
|4_abc| null| null|null|
+-----+-----+-----+----+

# Select the required columns like id, cost etc. 
final_df = df1.join(df2, df1.id.isin(df2.id_1, df2.id_2), 'left_outer').select('id','cost')

final_df.show()
+-----+----+
|   id|cost|
+-----+----+
|1_ghi|  12|
|2_mno|  86|
|3_xyz| 105|
|4_abc|null|
+-----+----+


推荐阅读