json - Spark Scala - 将结构数组拆分为数据框列
问题描述
我有一个包含结构数组的嵌套源 json 文件。结构的数量因行而异,我想使用 Spark(scala)从结构的键/值动态创建新的数据框列,其中键是列名,值是列值。
示例 缩小的 json 记录
{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}
数据框架构
scala> val df = spark.read.json("file:///tmp/nested_test.json")
root
|-- key1: struct (nullable = true)
| |-- key2: struct (nullable = true)
| | |-- key3: string (nullable = true)
| | |-- key4: string (nullable = true)
| | |-- key5: struct (nullable = true)
| | | |-- key6: string (nullable = true)
| | | |-- key7: string (nullable = true)
| | | |-- values: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- name: string (nullable = true)
| | | | | |-- value: string (nullable = true)
到目前为止做了什么
df.select(
($"key1.key2.key3").as("key3"),
($"key1.key2.key4").as("key4"),
($"key1.key2.key5.key6").as("key6"),
($"key1.key2.key5.key7").as("key7"),
($"key1.key2.key5.values").as("values")).
show(truncate=false)
+----+----+----+----+----------------------------------------------------------------------------+
|key3|key4|key6|key7|values |
+----+----+----+----+----------------------------------------------------------------------------+
|AK |EU |001 |N |[[valuesColumn1, 9.876], [valuesColumn2, 1.2345], [valuesColumn3, 8.675309]]|
+----+----+----+----+----------------------------------------------------------------------------+
这里有一个由 3 个结构组成的数组,但是这 3 个结构需要动态地溢出到 3 个单独的列中(3 个的数量可能会有很大差异),我不知道该怎么做。
样本所需的输出
请注意,为数组中的每个数组元素生成了 3 个新列values
。
+----+----+----+----+-----------------------------------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-----------------------------------------+
|AK |EU |001 |N |9.876 |1.2345 |8.675309 |
+----+----+----+----+-----------------------------------------+
参考
我相信所需的解决方案与此 SO 帖子中讨论的类似,但有两个主要区别:
- 在 SO 帖子中,列数被硬编码为 3,但在我的情况下,数组元素的数量是未知的
- 列名需要由
name
列驱动,列值由value
.
...
| | | | |-- element: struct (containsNull = true)
| | | | | |-- name: string (nullable = true)
| | | | | |-- value: string (nullable = true)
解决方案
你可以这样做:
val sac = new SparkContext("local[*]", " first Program");
val sqlc = new SQLContext(sac);
import sqlc.implicits._;
import org.apache.spark.sql.functions.split
import scala.math._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{ min, max }
val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""
val df1 = sqlc.read.json(Seq(json).toDS())
val df2 = df1.select(
($"key1.key2.key3").as("key3"),
($"key1.key2.key4").as("key4"),
($"key1.key2.key5.key6").as("key6"),
($"key1.key2.key5.key7").as("key7"),
($"key1.key2.key5.values").as("values")
)
val numColsVal = df2
.withColumn("values_size", size($"values"))
.agg(max($"values_size"))
.head()
.getInt(0)
val finalDFColumns = df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null))).columns
val finalDF = df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)
结果最终输出为:
+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK |EU |001 |N |9.876 |1.2345 |8.675309 |
+----+----+----+----+-------------+-------------+-------------+
希望我的问题是对的!
------------ 编辑说明------------
此块获取要为数组结构创建的列数。
val numColsVal = df2
.withColumn("values_size", size($"values"))
.agg(max($"values_size"))
.head()
.getInt(0)
finalDFColumns
是使用所有预期列作为空值输出创建的 DF。
下面的块返回需要从数组结构创建的不同列。
df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect
下面的块将上述新列与df2
初始化为空/空值的其他列组合在一起。
foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null)))
如果您打印输出,则将这两个块结合起来,您将得到:
+----+----+----+----+------+-------------+-------------+-------------+
|key3|key4|key6|key7|values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+------+-------------+-------------+-------------+
+----+----+----+----+------+-------------+-------------+-------------+
现在我们已经准备好了结构。我们需要这里对应列的值。下面的块为我们提供了值:
df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)
结果如下:
+----+----+----+----+--------------------+---------------+---------------+---------------+
|key3|key4|key6|key7| values|values[0][name]|values[1][name]|values[2][name]|
+----+----+----+----+--------------------+---------------+---------------+---------------+
| AK| EU| 001| N|[[valuesColumn1, ...| 9.876| 1.2345| 8.675309|
+----+----+----+----+--------------------+---------------+---------------+---------------+
现在我们需要像上面第一个块中那样重命名列。所以我们将使用zip
函数合并列,然后使用 foldLeft 方法重命名输出列,如下所示:
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)
这导致以下结构:
+----+----+----+----+--------------------+-------------+-------------+-------------+
|key3|key4|key6|key7| values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+--------------------+-------------+-------------+-------------+
| AK| EU| 001| N|[[valuesColumn1, ...| 9.876| 1.2345| 8.675309|
+----+----+----+----+--------------------+-------------+-------------+-------------+
我们就快到了。我们现在只需要values
像这样删除不需要的列:
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)
因此导致预期的输出如下 -
+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK |EU |001 |N |9.876 |1.2345 |8.675309 |
+----+----+----+----+-------------+-------------+-------------+
我不确定我是否能够清楚地解释它。但是,如果您尝试破坏上述语句/代码并尝试打印它,您将了解我们在输出之前是如何到达的。您可以在 Internet 上找到此逻辑中使用的不同功能的示例说明。
推荐阅读
- javascript - 使用 ajax 或 javascript 将值从 window.prompt 传递到服务器
- javascript - Bootstrap Typeahead 在我的场景中仅单击两次,需要动态处理 20 行
- jquery - 如何强制我的搜索输入字段忽略非拉丁字符?
- mongodb - mongostat 报告的“命令”操作的数量是多少?
- matlab - 使用 Matlab 在循环内更改循环
- java - 带有文件上传器的 JUnit forRest API 失败并出现 406 错误
- php - 如何替换 Symfony 4 函数中的变量?
- html - 在页面加载开始视图在容器的正确位置,但保留文本 ltr
- python - selenium.common.exceptions.SessionNotCreatedException:消息:无法找到与 GeckoDriver、Selenium 和 Firefox 匹配的一组功能
- python - 无论键的位置如何,都更新字典值