首页 > 解决方案 > Spark JSON嵌套数组到DataFrame

问题描述

我需要处理具有以下架构的 json 文件:

root
 |-- Header: struct (nullable = true)
 |    |-- Format: string (nullable = true)
 |    |-- Version: struct (nullable = true)
 |    |    |-- vfield: string (nullable = true)
 |-- Payload: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Data: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |-- Event: struct (nullable = true)
 |    |    |    |-- eventCount: long (nullable = true)
 |    |    |    |-- eventName: string (nullable = true)

当我将它加载到 DataFrame 中时,只有一个 Row,但该行在 Payload 数组中包含大量数据和事件元素。(一个元素具有数据或事件,但从不具有两者)

我想获取所有事件,以便我可以对它们执行一些进一步的操作,或者稍后将它们加载到数据库表等中......为了做到这一点,我将需要所有具有事件的有效负载元素,我赢了不需要只有“数据”的那个。最好的办法是最后有一个 DataFrame,其中包含仅包含 Event 成员的行...

不幸的是,当我尝试这样的事情时:

df.select("Payload.Event")或者df.select(`Payload`).filter(...)

然后它仍然在根上进行过滤,但由于 DataFrame 中只有一行不是很有帮助。如何过滤内部数组,并将其元素作为单独的数据框获取?

示例 json:

{
    "Header": {
        "Version": {
            "vfield": "0.6"
        },
        "Format": "DEFAULT"
    },
    "Payload": [
        {"Data": [
            [0, 1, 2],
            [5, 6]
        ]},

        {"Event": {
            "eventName" : "event1",
            "eventCount": 123
        }},
        {"Event": {
            "eventName" : "event2",
            "eventCount": 124
        }},
        { "Data": [
            [5,8],
            [1,2,6]
        ] }
    ]        
}    

标签: jsonscalaapache-sparkapache-spark-sql

解决方案


因为Payload是类型的array,如果你没有访问任何东西explode会给你类型的结果array

更改df.select("Payload.Event")df.withColumn("Payload",explode("Payload")).select("Payload.Event")

检查下面的代码。

scala> df.printSchema
root
 |-- Header: struct (nullable = true)
 |    |-- Format: string (nullable = true)
 |    |-- Version: struct (nullable = true)
 |    |    |-- vfield: string (nullable = true)
 |-- Payload: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Data: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |-- Event: struct (nullable = true)
 |    |    |    |-- eventCount: long (nullable = true)
 |    |    |    |-- eventName: string (nullable = true)


scala> df.withColumn("Payload",explode($"Payload")).select("Payload.Event").printSchema
root
 |-- Event: struct (nullable = true)
 |    |-- eventCount: long (nullable = true)
 |    |-- eventName: string (nullable = true)


scala> df.withColumn("Payload",explode($"Payload")).select("Payload.Event.*").printSchema
root
 |-- eventCount: long (nullable = true)
 |-- eventName: string (nullable = true)

推荐阅读