scala - 从两个 Hive 表的连接创建一个结构数据类型数组
问题描述
我在 Hive 有两张桌子-
emp(empid int,empname string,deptid string)
dept(deptid string, deptname string)
样本数据
Hive 中的 Emp 表具有 schema empid int,empname string,deptid string
1,Monami Sen,D01
2,Tarun Sen,D02
3,Shovik Sen,D03
4, Rita Roy,D02
5,Farhan,D01
Hive 中的 Dept 表具有模式 deptid 字符串、deptname 字符串
D01,Finance
D02,IT
D03,Accounts
D04,Admin
我需要创建另一个应该具有以下架构的配置单元表 -
dept id string, dept name string, emp_details array<struct<emp_id:string,emp_name string>>
struct 属性数组应包含所有员工详细信息 - 属于特定部门的 empid 和 empname,最终数据帧应转换为 JSON 格式。
期望的输出:
{"deptid":"D01","deptname":"IT","empdetails":[{"empid":1,"empname":"Monami Sen"}]}
{"deptid":"D02","deptname":"Accounts","empdetails":[{"empid":2,"empname":"Rita Roy"},
{"empid":5,"empname":"Rijul Shah"}]}
{"deptid":"D03","deptname":"Finance","empdetails":[{"empid":3,"empname":"Shovik Sen"},{"empid":4,"empname":"Arghya Ghosh"}]}
{"deptid":"D04","deptname":"Adminstration","empdetails":[]}
我需要使用 Spark 1.6 版和 Scala 2.10 进行编码。数据集很大,因此需要有效的代码处理才能获得最佳性能。
你能帮我对代码提出任何建议吗?
解决方案
我建议执行left_outer
连接,然后是groupBy/collect_list
聚合和toJSON
转换,如下所示:
val empDF = Seq(
(1, "Monami Sen", "D01"),
(2, "Tarun Sen", "D02"),
(3, "Shovik Sen", "D03"),
(4, "Rita Roy", "D02"),
(5, "Farhan", "D01")
).toDF("empid", "empname", "deptid")
val deptDF = Seq(
("D01", "Finance"),
("D02", "IT"),
("D03", "Accounts"),
("D04", "Admin")
).toDF("deptid", "deptname")
deptDF.join(empDF, Seq("deptid"), "left_outer").
groupBy("deptid", "deptname").
agg(collect_list(struct($"empid", $"empname")).as("empdetails")).
toJSON.
show(false)
// +----------------------------------------------------------------------------------------------------------------------+
// |value |
// +----------------------------------------------------------------------------------------------------------------------+
// |{"deptid":"D03","deptname":"Accounts","empdetails":[{"empid":3,"empname":"Shovik Sen"}]} |
// |{"deptid":"D02","deptname":"IT","empdetails":[{"empid":4,"empname":"Rita Roy"},{"empid":2,"empname":"Tarun Sen"}]} |
// |{"deptid":"D01","deptname":"Finance","empdetails":[{"empid":5,"empname":"Farhan"},{"empid":1,"empname":"Monami Sen"}]}|
// |{"deptid":"D04","deptname":"Admin","empdetails":[{}]} |
// +----------------------------------------------------------------------------------------------------------------------+
对于Spark 1.6
,请考虑通过 Spark SQL 进行聚合(因为collect_list
Spark DataFrame API 中似乎不支持非原始字段类型):
deptDF.join(empDF, Seq("deptid"), "left_outer").
createOrReplaceTempView("joined_table")
val resultDF = sqlContext.sql("""
select deptid, deptname, collect_list(struct(empid, empname)) as empdetails
from joined_table
group by deptid, deptname
""")
resultDF.toJSON.
show(false)
推荐阅读
- java - Generate exe for 32-bit system of Java software
- rxjs - 从不可预测的源 Observable 构建“心跳”Observable
- rest - 如何在 Katalon Studio Rest API 中将变量值传递给 json Path
- javascript - How to prevent infinite re-rendering with useEffect() in React
- ios - 在 SwiftUI 中为视图设置动画以在点击时向上滑动和隐藏
- c - 寻找完美数的程序:输出错误。完美数是因数之和等于给定数的数
- android - How Adding Real time data and give min and max angle to Slice Pie chart
- html - 启用 UTF-8 或使用 TheArtOfDev 的 HtmlRenderer.PdfSharp 设置自定义字体
- javascript - 如何使用 JS 在 CSS 中禁用“悬停”
- python - 对数据库执行多项操作时出错