首页 > 解决方案 > Spark Scala - 将结构数组拆分为数据框列

问题描述

我有一个包含结构数组的嵌套源 json 文件。结构的数量因行而异,我想使用 Spark(scala)从结构的键/值动态创建新的数据框列,其中键是列名,值是列值。

示例 缩小的 json 记录

{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}

数据框架构

scala> val df = spark.read.json("file:///tmp/nested_test.json")
root
 |-- key1: struct (nullable = true)
 |    |-- key2: struct (nullable = true)
 |    |    |-- key3: string (nullable = true)
 |    |    |-- key4: string (nullable = true)
 |    |    |-- key5: struct (nullable = true)
 |    |    |    |-- key6: string (nullable = true)
 |    |    |    |-- key7: string (nullable = true)
 |    |    |    |-- values: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

到目前为止做了什么

df.select(
    ($"key1.key2.key3").as("key3"),
    ($"key1.key2.key4").as("key4"),
    ($"key1.key2.key5.key6").as("key6"),
    ($"key1.key2.key5.key7").as("key7"),
    ($"key1.key2.key5.values").as("values")).
    show(truncate=false)

+----+----+----+----+----------------------------------------------------------------------------+
|key3|key4|key6|key7|values                                                                      |
+----+----+----+----+----------------------------------------------------------------------------+
|AK  |EU  |001 |N   |[[valuesColumn1, 9.876], [valuesColumn2, 1.2345], [valuesColumn3, 8.675309]]|
+----+----+----+----+----------------------------------------------------------------------------+

这里有一个由 3 个结构组成的数组,但是这 3 个结构需要动态地溢出到 3 个单独的列中(3 个的数量可能会有很大差异),我不知道该怎么做。

样本所需的输出

请注意,为数组中的每个数组元素生成了 3 个新列values

+----+----+----+----+-----------------------------------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-----------------------------------------+
|AK  |EU  |001 |N   |9.876        |1.2345        |8.675309    |
+----+----+----+----+-----------------------------------------+

参考

我相信所需的解决方案与此 SO 帖子中讨论的类似,但有两个主要区别:

  1. 在 SO 帖子中,列数被硬编码为 3,但在我的情况下,数组元素的数量是未知的
  2. 列名需要由name列驱动,列值由value.
...
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

标签: jsonscalaapache-spark

解决方案


你可以这样做:

val sac = new SparkContext("local[*]", " first Program");
val sqlc = new SQLContext(sac);
import sqlc.implicits._;
import org.apache.spark.sql.functions.split
import scala.math._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{ min, max }

val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""

val df1 = sqlc.read.json(Seq(json).toDS())

val df2 = df1.select(
    ($"key1.key2.key3").as("key3"),
    ($"key1.key2.key4").as("key4"),
    ($"key1.key2.key5.key6").as("key6"),
    ($"key1.key2.key5.key7").as("key7"),
    ($"key1.key2.key5.values").as("values")
)

val numColsVal = df2
    .withColumn("values_size", size($"values"))
    .agg(max($"values_size"))
    .head()
    .getInt(0)

val finalDFColumns = df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null))).columns
val finalDF = df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)

结果最终输出为:

+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
+----+----+----+----+-------------+-------------+-------------+

希望我的问题是对的!

------------ 编辑说明------------

此块获取要为数组结构创建的列数。

val numColsVal = df2
        .withColumn("values_size", size($"values"))
        .agg(max($"values_size"))
        .head()
        .getInt(0)

finalDFColumns是使用所有预期列作为空值输出创建的 DF。

下面的块返回需要从数组结构创建的不同列。

df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect

下面的块将上述新列与df2初始化为空/空值的其他列组合在一起。

foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null)))

如果您打印输出,则将这两个块结合起来,您将得到:

+----+----+----+----+------+-------------+-------------+-------------+
|key3|key4|key6|key7|values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+------+-------------+-------------+-------------+
+----+----+----+----+------+-------------+-------------+-------------+

现在我们已经准备好了结构。我们需要这里对应列的值。下面的块为我们提供了值:

df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)

结果如下:

+----+----+----+----+--------------------+---------------+---------------+---------------+
|key3|key4|key6|key7|              values|values[0][name]|values[1][name]|values[2][name]|
+----+----+----+----+--------------------+---------------+---------------+---------------+
|  AK|  EU| 001|   N|[[valuesColumn1, ...|          9.876|         1.2345|       8.675309|
+----+----+----+----+--------------------+---------------+---------------+---------------+

现在我们需要像上面第一个块中那样重命名列。所以我们将使用zip函数合并列,然后使用 foldLeft 方法重命名输出列,如下所示:

finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)

这导致以下结构:

+----+----+----+----+--------------------+-------------+-------------+-------------+
|key3|key4|key6|key7|              values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+--------------------+-------------+-------------+-------------+
|  AK|  EU| 001|   N|[[valuesColumn1, ...|        9.876|       1.2345|     8.675309|
+----+----+----+----+--------------------+-------------+-------------+-------------+

我们就快到了。我们现在只需要values像这样删除不需要的列:

finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)

因此导致预期的输出如下 -

+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
+----+----+----+----+-------------+-------------+-------------+

我不确定我是否能够清楚地解释它。但是,如果您尝试破坏上述语句/代码并尝试打印它,您将了解我们在输出之前是如何到达的。您可以在 Internet 上找到此逻辑中使用的不同功能的示例说明。


推荐阅读