首页 > 解决方案 > 使用 Spark DataFrame 或 RDD API 解析具有不同模式的嵌套 JSON 结构

问题描述

我有很多这样结构的json

{
    "parent_id": "parent_id1",
    "devices" : "HERE_IS_STRUCT_SERIALIZED_AS_STRING_SEE BELOW"
}

{
    "0x0034" : { "id": "0x0034", "p1": "p1v1", "p2": "p2v1" },
    "0xAB34" : { "id": "0xAB34", "p1": "p1v2", "p2": "p2v2" },
    "0xCC34" : { "id": "0xCC34", "p1": "p1v3", "p2": "p2v3" },
    "0xFFFF" : { "id": "0xFFFF", "p1": "p1v4", "p2": "p2v4" },
    ....
    "0x0023" : { "id": "0x0023", "p1": "p1vN", "p2": "p2vN" },
}

如您所见,遥测开发人员将每个元素序列化为对象的属性,而不是创建对象数组,属性名称也因 id 而异。

使用 Spark DataFrame 或 RDD API,我想将它转换成这样的表

parent_id1, 0x0034, p1v1, p2v1
parent_id1, 0xAB34, p1v2, p2v2
parent_id1, 0xCC34, p1v3, p2v3
parent_id1, 0xFFFF, p1v4, p2v4
parent_id1, 0x0023, p1v5, p2v5

这是示例数据:

{
    "parent_1": "parent_v1",
    "devices" : "{ \"0x0034\" : { \"id\": \"0x0034\", \"p1\": \"p1v1\", \"p2\": \"p2v1\" }, \"0xAB34\" : { \"id\": \"0xAB34\", \"p1\": \"p1v2\", \"p2\": \"p2v2\" }, \"0xCC34\" : { \"id\": \"0xCC34\", \"p1\": \"p1v3\", \"p2\": \"p2v3\" }, \"0xFFFF\" : { \"id\": \"0xFFFF\", \"p1\": \"p1v4\", \"p2\": \"p2v4\" }, \"0x0023\" : { \"id\": \"0x0023\", \"p1\": \"p1vN\", \"p2\": \"p2vN\" }}"
}

{
    "parent_2": "parent_v1",
    "devices" : "{ \"0x0045\" : { \"id\": \"0x0045\", \"p1\": \"p1v1\", \"p2\": \"p2v1\" }, \"0xC5C1\" : { \"id\": \"0xC5C1\", \"p1\": \"p1v2\", \"p2\": \"p2v2\" }}"
}

期望的输出

parent_id1, 0x0034, p1v1, p2v1
parent_id1, 0xAB34, p1v2, p2v2
parent_id1, 0xCC34, p1v3, p2v3
parent_id1, 0xFFFF, p1v4, p2v4
parent_id1, 0x0023, p1v5, p2v5

parent_id2, 0x0045, p1v1, p2v1
parent_id2, 0xC5C1, p1v2, p2v2

我考虑过将设备作为 from_json 函数的参数传递,然后以某种方式将返回的对象转换为 JSON 数组,然后将其分解……但是 from_json 想要模式作为输入,但模式往往会有所不同……

标签: apache-sparkapache-spark-sqldatabricksazure-databricks

解决方案


可能有一种更pythonic或sparkian的方式来做到这一点,但这对我有用:

输入数据

data = {
    "parent_id": "parent_v1",
    "devices" : "{ \"0x0034\" : { \"id\": \"0x0034\", \"p1\": \"p1v1\", \"p2\": \"p2v1\" }, \"0xAB34\" : { \"id\": \"0xAB34\", \"p1\": \"p1v2\", \"p2\": \"p2v2\" }, \"0xCC34\" : { \"id\": \"0xCC34\", \"p1\": \"p1v3\", \"p2\": \"p2v3\" }, \"0xFFFF\" : { \"id\": \"0xFFFF\", \"p1\": \"p1v4\", \"p2\": \"p2v4\" }, \"0x0023\" : { \"id\": \"0x0023\", \"p1\": \"p1vN\", \"p2\": \"p2vN\" }}"
}

获取数据框

import json

def get_df_from_json(json_data):
    #convert string to json
    json_data['devices'] = json.loads(json_data['devices'])
    list_of_dicts = []
    for device_name, device_details in json_data['devices'].items():
        row = {
          "parent_id": json_data['parent_id'],
          "device": device_name
        }
        for key in device_details.keys():
            row[key] = device_details[key]
        list_of_dicts.append(row)
    return spark.read.json(sc.parallelize(list_of_dicts), multiLine=True)
display(get_df_from_json(data))

输出

+--------+--------+------+------+-----------+
| device |   id   |  p1  |  p2  | parent_id |
+--------+--------+------+------+-----------+
| 0x0034 | 0x0034 | p1v1 | p2v1 | parent_v1 |
| 0x0023 | 0x0023 | p1vN | p2vN | parent_v1 |
| 0xFFFF | 0xFFFF | p1v4 | p2v4 | parent_v1 |
| 0xCC34 | 0xCC34 | p1v3 | p2v3 | parent_v1 |
| 0xAB34 | 0xAB34 | p1v2 | p2v2 | parent_v1 |
+--------+--------+------+------+-----------+

推荐阅读