首页 > 解决方案 > 基于键合并两列结构数组

问题描述

我有如下模式的数据框:

输入数据框

 |-- A: string (nullable = true)
 |-- B_2020: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)
 |    |    |-- z: double (nullable = true)
 |-- B_2019: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)

我还想根据匹配key值将 2020 和 2019 列合并为一列结构数组。

所需的架构:

预期输出数据帧

 |-- A: string (nullable = true)
 |-- B: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x_this_year: double (nullable = true)
 |    |    |-- y_this_year: double (nullable = true)
 |    |    |-- x_last_year: double (nullable = true)
 |    |    |-- y_last_year: double (nullable = true)
 |    |    |-- z_this_year: double (nullable = true)

我想合并结构中的匹配键。另请注意,如果仅在 2019 年或 2020 年数据中的一个中存在键,则null需要用于替换合并列中另一年的值。

标签: scalaapache-sparkapache-spark-sql

解决方案


scala> val df = Seq(
     |   ("ABC", 
     |   Seq(
     |     ("a", 2, 4, 6),
     |     ("b", 3, 6, 9),
     |     ("c", 1, 2, 3)
     |   ),
     |   Seq(
     |     ("a", 4, 8),
     |     ("d", 3, 4)
     |   ))
     | ).toDF("A", "B_2020", "B_2019").select(
     |   $"A",
     |   $"B_2020" cast "array<struct<key:string,x:double,y:double,z:double>>",
     |   $"B_2019" cast "array<struct<key:string,x:double,y:double>>"
     | )
df: org.apache.spark.sql.DataFrame = [A: string, B_2020: array<struct<key:string,x:double,y:double,z:double>> ... 1 more field]

scala> df.printSchema
root
 |-- A: string (nullable = true)
 |-- B_2020: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)
 |    |    |-- z: double (nullable = true)
 |-- B_2019: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)



scala> df.show(false)
+---+------------------------------------------------------------+------------------------------+
|A  |B_2020                                                      |B_2019                        |
+---+------------------------------------------------------------+------------------------------+
|ABC|[[a, 2.0, 4.0, 6.0], [b, 3.0, 6.0, 9.0], [c, 1.0, 2.0, 3.0]]|[[a, 4.0, 8.0], [d, 3.0, 4.0]]|
+---+------------------------------------------------------------+------------------------------+



scala> val df2020 = df.select($"A", explode($"B_2020") as "this_year").select($"A", 
     | $"this_year.key" as "key", $"this_year.x" as "x_this_year", 
     | $"this_year.y" as "y_this_year", $"this_year.z" as "z_this_year")
df2020: org.apache.spark.sql.DataFrame = [A: string, key: string ... 3 more fields]



scala> val df2019 = df.select($"A", explode($"B_2019") as "last_year").select($"A", 
     | $"last_year.key" as "key", $"last_year.x" as "x_last_year", 
     | $"last_year.y" as "y_last_year")
df2019: org.apache.spark.sql.DataFrame = [A: string, key: string ... 2 more fields]



scala> df2020.show(false)
+---+---+-----------+-----------+-----------+
|A  |key|x_this_year|y_this_year|z_this_year|
+---+---+-----------+-----------+-----------+
|ABC|a  |2.0        |4.0        |6.0        |
|ABC|b  |3.0        |6.0        |9.0        |
|ABC|c  |1.0        |2.0        |3.0        |
+---+---+-----------+-----------+-----------+



scala> df2019.show(false)
+---+---+-----------+-----------+
|A  |key|x_last_year|y_last_year|
+---+---+-----------+-----------+
|ABC|a  |4.0        |8.0        |
|ABC|d  |3.0        |4.0        |
+---+---+-----------+-----------+



scala> val outputDF = df2020.join(df2019, Seq("A", "key"),  "outer").select(
     |   $"A" as "market_name", 
     |   struct($"key", $"x_this_year", $"y_this_year", $"x_last_year", 
     |     $"y_last_year", $"z_this_year") as "cancellation_policy_booking")
outputDF: org.apache.spark.sql.DataFrame = [market_name: string, cancellation_policy_booking: struct<key: string, x_this_year: double ... 4 more fields>]

scala> outputDF.printSchema
root
 |-- market_name: string (nullable = true)
 |-- cancellation_policy_booking: struct (nullable = false)
 |    |-- key: string (nullable = true)
 |    |-- x_this_year: double (nullable = true)
 |    |-- y_this_year: double (nullable = true)
 |    |-- x_last_year: double (nullable = true)
 |    |-- y_last_year: double (nullable = true)
 |    |-- z_this_year: double (nullable = true)


scala> outputDF.show(false)
+-----------+----------------------------+                                      
|market_name|cancellation_policy_booking |
+-----------+----------------------------+
|ABC        |[b, 3.0, 6.0,,, 9.0]        |
|ABC        |[a, 2.0, 4.0, 4.0, 8.0, 6.0]|
|ABC        |[d,,, 3.0, 4.0,]            |
|ABC        |[c, 1.0, 2.0,,, 3.0]        |
+-----------+----------------------------+




推荐阅读