首页 > 解决方案 > 从两个 Hive 表的连接创建一个结构数据类型数组

问题描述

我在 Hive 有两张桌子-

 emp(empid int,empname string,deptid string)
 dept(deptid string, deptname string)

样本数据

Hive 中的 Emp 表具有 schema empid int,empname string,deptid string

 1,Monami Sen,D01
 2,Tarun Sen,D02
 3,Shovik Sen,D03
 4, Rita Roy,D02
 5,Farhan,D01

Hive 中的 Dept 表具有模式 deptid 字符串、deptname 字符串

 D01,Finance
 D02,IT
 D03,Accounts
 D04,Admin

我需要创建另一个应该具有以下架构的配置单元表 -

dept id string, dept name string, emp_details array<struct<emp_id:string,emp_name string>>

struct 属性数组应包含所有员工详细信息 - 属于特定部门的 empid 和 empname,最终数据帧应转换为 JSON 格式。

期望的输出:

{"deptid":"D01","deptname":"IT","empdetails":[{"empid":1,"empname":"Monami Sen"}]}
{"deptid":"D02","deptname":"Accounts","empdetails":[{"empid":2,"empname":"Rita Roy"}, 
{"empid":5,"empname":"Rijul Shah"}]}
{"deptid":"D03","deptname":"Finance","empdetails":[{"empid":3,"empname":"Shovik Sen"},{"empid":4,"empname":"Arghya Ghosh"}]}
{"deptid":"D04","deptname":"Adminstration","empdetails":[]}

我需要使用 Spark 1.6 版和 Scala 2.10 进行编码。数据集很大,因此需要有效的代码处理才能获得最佳性能。

你能帮我对代码提出任何建议吗?

标签: scalaapache-sparkhivebigdata

解决方案


我建议执行left_outer连接,然后是groupBy/collect_list聚合和toJSON转换,如下所示:

val empDF = Seq(
  (1, "Monami Sen", "D01"),
  (2, "Tarun Sen", "D02"),
  (3, "Shovik Sen", "D03"),
  (4, "Rita Roy", "D02"),
  (5, "Farhan", "D01")
).toDF("empid", "empname", "deptid")

val deptDF = Seq(
  ("D01", "Finance"),
  ("D02", "IT"),
  ("D03", "Accounts"),
  ("D04", "Admin")
).toDF("deptid", "deptname")

deptDF.join(empDF, Seq("deptid"), "left_outer").
  groupBy("deptid", "deptname").
  agg(collect_list(struct($"empid", $"empname")).as("empdetails")).
  toJSON.
  show(false)
// +----------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                 |
// +----------------------------------------------------------------------------------------------------------------------+
// |{"deptid":"D03","deptname":"Accounts","empdetails":[{"empid":3,"empname":"Shovik Sen"}]}                              |
// |{"deptid":"D02","deptname":"IT","empdetails":[{"empid":4,"empname":"Rita Roy"},{"empid":2,"empname":"Tarun Sen"}]}    |
// |{"deptid":"D01","deptname":"Finance","empdetails":[{"empid":5,"empname":"Farhan"},{"empid":1,"empname":"Monami Sen"}]}|
// |{"deptid":"D04","deptname":"Admin","empdetails":[{}]}                                                                 |
// +----------------------------------------------------------------------------------------------------------------------+

对于Spark 1.6,请考虑通过 Spark SQL 进行聚合(因为collect_listSpark DataFrame API 中似乎不支持非原始字段类型):

deptDF.join(empDF, Seq("deptid"), "left_outer").
  createOrReplaceTempView("joined_table")

val resultDF = sqlContext.sql("""
  select deptid, deptname, collect_list(struct(empid, empname)) as empdetails
  from joined_table
  group by deptid, deptname
""")

resultDF.toJSON.
  show(false)

推荐阅读