apache-spark - 如果没有匹配,则在键上左连接,然后在不同的右键上连接以获取值
问题描述
我有两个 spark 数据框,比如 df_core 和 df_dict:
df_core 中有更多的 cols 但这与这里的问题无关
df_core:
id
1_ghi
2_mno
3_xyz
4_abc
df_dict:
id_1 id_2 cost
1_ghi 1_ghi 12
2_mno 2_rst 86
3_def 3_xyz 105
我想通过加入 2 个 dfs 从 df_dict.cost 中获取价值。
场景:加入 df_core.id == df_dict.id_1
如果 df_core.id 与外键 df_dict.id_1 不匹配(例如:3_xyz),则连接应该发生在 df_dict.id_2
我能够实现第一个键的连接,但不确定如何实现该场景
final_df = df_core.alias("df_core_alias").join(df_dict, df_core.id== df_dict.id_1, 'left').select('df_core_alias.*', df_dict.cost)
解决方案不必是数据帧操作。如果这很容易和/或优化,我可以从数据帧中创建临时视图,然后在其上运行 SQL。
我还想到了一个 SQL 解决方案(未经测试):
SELECT
core.id,
dict.cost
FROM
df_core core LEFT JOIN df_dict dict
ON core.id = dict.id_1
OR core.id = dict.id_2
预期的df:
id cost
1_ghi 12
2_mno 86
3_xyz 105
4_abc
那么项目计划太大了,无法在评论中添加,所以我必须在这里提问
以下是isin的火花计划:
== Physical Plan ==
*(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
:- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
: +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 = )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
: +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
+- BroadcastExchange IdentityBroadcastMode
+- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>
BroadcastNestedLoopJoin中的过滤器来自之前的df_core转换,但我们知道 spark 的惰性评估,我们在项目计划中看到它
此外,我刚刚意识到 final_df.show() 适用于我使用的任何解决方案。但是需要无限时间来处理的是我在 final_df 上进行的下一个转换,这是我实际的expected_df。这是我的下一个转变:
expected_df = spark.sql("select region_type, cost, core_sector_value, count(core_id) from final_df_view group by region_type, cost, core_sector_value order by region_type, cost, core_sector_value")
&这里是expected_df的计划:
== Physical Plan ==
*(5) Sort [region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST, 200)
+- *(4) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[count(core_id#22)])
+- Exchange hashpartitioning(region_type#26, cost#13, core_sector_value#21, 200)
+- *(3) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[partial_count(core_id#22)])
+- *(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
:- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
: +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 = )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
: +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
+- BroadcastExchange IdentityBroadcastMode
+- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>
看到这个计划,我认为对于 spark local 的内存转换来说太重了。执行这么多不同的步骤转换是最佳实践,还是我应该尝试提出一个包含所有业务逻辑的查询?
此外,您能否直接访问任何资源以了解我们使用 explain() 函数获得的 Spark 计划?谢谢
解决方案
看起来像in
left_outer
操作:
# Final DF will have all columns from df1 and df2
final_df = df1.join(df2, df1.id.isin(df2.id_1, df2.id_2), 'left_outer')
final_df.show()
+-----+-----+-----+----+
| id| id_1| id_2|cost|
+-----+-----+-----+----+
|1_ghi|1_ghi|1_ghi| 12|
|2_mno|2_mno|2_rst| 86|
|3_xyz|3_def|3_xyz| 105|
|4_abc| null| null|null|
+-----+-----+-----+----+
# Select the required columns like id, cost etc.
final_df = df1.join(df2, df1.id.isin(df2.id_1, df2.id_2), 'left_outer').select('id','cost')
final_df.show()
+-----+----+
| id|cost|
+-----+----+
|1_ghi| 12|
|2_mno| 86|
|3_xyz| 105|
|4_abc|null|
+-----+----+
推荐阅读
- sql - 无法加载文件或程序集 microsoft.sqlserver.management.sdk.sfc 版本=11.0.0.0 文化=中性 publickeytoken
- php - Codeigniter 无法从购物车中检索数据
- mysql - 在 SQL 之间选择日期。其中日期的格式为 dd/mm/yyyy 并且是一个字符串
- javascript - Gulp 为什么单个文件类型在括号中不起作用
- python - 使用 beautifulsoup 抓取数据
- jquery - 使用 jQuery 选择多个 ID 时遇到问题
- android - android模板合并build.gradle:底部添加了“应用插件”
- r - R Prophet add_regressor 给出奇怪的结果
- windows - 使用 Windows 批处理文件根据文件名的一部分将文件分类到文件夹中
- c++ - Boost消息队列文件可以重定向到用户指定的位置吗