apache-spark - PYSPARK :将一个表列与另一个表中的两列之一连接起来
问题描述
我的问题如下:
Table 1
ID1 ID2
1 2
3 4
Table 2
C1 VALUE
1 London
4 Texas
Table3
C3 VALUE
2 Paris
3 Arizona
表 1 有主要和次要 ID。我需要创建一个最终输出,它是基于来自 table1 的 Ids 映射的来自 Table2 和 Table3 的值的聚合。
即,如果 table2 或 table3 中的值映射到其中一个 ID,则应将其聚合为一个。
i.e my final output should look like:
ID Aggregated
1 [2, London, Paris] // since Paris is mapped to 2 which is turn is mapped to 1
3 [4, Texas, Arizona] // Texas is mapped to 4 which in turn is mapped to 3
任何建议如何在 pyspark 中实现这一点。
我不确定加入表格是否有助于解决这个问题。
我在想 PairedRDD 可能会帮助我,但我无法提出正确的解决方案。
谢谢
解决方案
下面是一个非常简单的方法:
spark.sql(
"""
select 1 as id1,2 as id2
union
select 3 as id1,4 as id2
""").createOrReplaceTempView("table1")
spark.sql(
"""
select 1 as c1, 'london' as city
union
select 4 as c1, 'texas' as city
""").createOrReplaceTempView("table2")
spark.sql(
"""
select 2 as c1, 'paris' as city
union
select 3 as c1, 'arizona' as city
""").createOrReplaceTempView("table3")
spark.table("table1").show()
spark.table("table2").show()
spark.table("table3").show()
# for simplicity, union table2 and table 3
spark.sql(""" select * from table2 union all select * from table3 """).createOrReplaceTempView("city_mappings")
spark.table("city_mappings").show()
# now join to the ids:
spark.sql("""
select id1, id2, city from table1
join city_mappings on c1 = id1 or c1 = id2
""").createOrReplaceTempView("id_to_city")
# and finally you can aggregate:
spark.sql("""
select id1, id2, collect_list(city)
from id_to_city
group by id1, id2
""").createOrReplaceTempView("result")
table("result").show()
# result looks like this, you can reshape to better suit your needs :
+---+---+------------------+
|id1|id2|collect_list(city)|
+---+---+------------------+
| 1| 2| [london, paris]|
| 3| 4| [texas, arizona]|
+---+---+------------------+
推荐阅读
- c++ - C++ :: 在按下回车键之前返回用户输入的函数
- c# - 我将如何在此 AI 以下代码中实现 navmesh 寻路。C# 统一
- java - 使用嵌套的for循环绘制等边三角形?
- javascript - 嵌套文本的样式不起作用
- java - Spring Boot + JMustache 404 not found error for .html page from /resources/templates 文件夹
- php - 如何使用 sum() 在 mysqli 中添加行
- python - 合并多个大型 DataFrame 的有效方法
- apache-kafka - 使用 SCS 删除消费消息的 kafka 日志
- chart-director - 如何去除 CharDirector ContourLayer 中生成的黑色边框?
- laravel - Laravel 5.5 条件