首页 > 解决方案 > Scala Spark Dataframe:如何同时爆炸 Int 数组和 struct 数组

问题描述

我是 Scala/Spark 的新手,我正在尝试分解一个具有数组列和结构列数组的数据框,这样我最终没有数组也没有结构。

这是一个例子

case class Area(start_time: String, end_time: String, area: String)

val df = Seq((
  "1", Seq(4,5,6), 
  Seq(Area("07:00", "07:30", "70"), Area("08:00", "08:30", "80"), Area("09:00", "09:30", "90"))
)).toDF("id", "before", "after")
df.printSchema
df.show

df具有以下架构

root
 |-- id: string (nullable = true)
 |-- before: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- after: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- start_time: string (nullable = true)
 |    |    |-- end_time: string (nullable = true)
 |    |    |-- area: string (nullable = true)

数据看起来像

+---+---------+--------------------+
| id|   before|               after|
+---+---------+--------------------+
|  1|[4, 5, 6]|[[07:00, 07:30, 7...|
+---+---------+--------------------+

如何分解数据框以便获得以下架构

 |-- id: string (nullable = true)
 |-- before: integer (containsNull = false)
 |-- after_start_time: string (nullable = true)
 |-- after_end_time: string (nullable = true)
 |-- after_area: string (nullable = true)

结果数据应该有 3 行和 5 列

+---+---------+--------------------+--------------------+--------+
| id|   before|    after_start_time|    after_start_time|    area|
+---+---------+--------------------+--------------------+--------+
|  1|        4|               07:00|               07:30|      70|
|  1|        5|               08:00|               08:30|      80|
|  1|        6|               09:00|               09:30|      90|
+---+---------+--------------------+--------------------+--------+

我正在使用 spark 2.3.0(arrays_zip不可用)。我能找到的唯一解决方案是爆炸两个字符串数组或一个结构数组。

标签: sqlarraysscaladataframeapache-spark

解决方案


用于arrays_zip组合两个数组,然后explode分解数组列并用于as重命名所需的列。

因为arrays_zipspark 2.3. 创建 UDF 以执行相同的操作。

val arrays_zip = udf((before:Seq[Int],after: Seq[Area]) => before.zip(after))

内置的执行时间(spark 2.4.2)arrays_zip - 花费的时间:1146 ms

UDF的执行时间arrays_zip- 所用时间:1165 ms

检查下面的代码。

scala> df.show(false)
+---+---------+------------------------------------------------------------+
|id |before   |after                                                       |
+---+---------+------------------------------------------------------------+
|1  |[4, 5, 6]|[[07:00, 07:30, 70], [08:00, 08:30, 80], [09:00, 09:30, 90]]|
+---+---------+------------------------------------------------------------+


scala> 
df
.select(
    $"id",
    explode(
        arrays_zip($"before",$"after")
        .cast("array<struct<before:int,after:struct<start_time:string,end_time:string,area:string>>>")
    ).as("before_after")
)
.select(
    $"id",
    $"before_after.before".as("before"),
    $"before_after.after.start_time".as("after_start_time"),
    $"before_after.after.end_time".as("after_end_time"),
    $"before_after.after.area"
)
.printSchema

root
 |-- id: string (nullable = true)
 |-- before: integer (nullable = true)
 |-- after_start_time: string (nullable = true)
 |-- after_end_time: string (nullable = true)
 |-- area: string (nullable = true)

输出

scala> 

df
.select(
    $"id",
    explode(
        arrays_zip($"before",$"after")
        .cast("array<struct<before:int,after:struct<start_time:string,end_time:string,area:string>>>")
    ).as("before_after")
)
.select(
    $"id",
    $"before_after.before".as("before"),
    $"before_after.after.start_time".as("after_start_time"),
    $"before_after.after.end_time".as("after_end_time"),
    $"before_after.after.area"
)
.show(false)

+---+------+----------------+--------------+----+
|id |before|after_start_time|after_end_time|area|
+---+------+----------------+--------------+----+
|1  |4     |07:00           |07:30         |70  |
|1  |5     |08:00           |08:30         |80  |
|1  |6     |09:00           |09:30         |90  |
+---+------+----------------+--------------+----+



推荐阅读