首页 > 解决方案 > PySpark 读取 DynamoDB 格式的 json

问题描述

我不是火花专业人士,所以请寻求帮助。

我使用内置服务从 DynamoDB 表迁移到 S3。它以 * .json格式保存文件。假设下面我们有一个行示例(每行数据都是嵌套在键“Item”下的字典)。

    {
    "Item": {
        "accept_languages":       {
            "M": {
                "en":    {"N": "0.9"},
                "en-US": {"N": "1"}
            }
        },
        "accept_mimetypes":       {
            "M": {
                "*/*":        {"N": "0.8"},
                "image/*":    {"N": "1"},
                "image/apng": {"N": "1"},
                "image/webp": {"N": "1"}
            }
        },
        "id":                     {"S": "5cddbd53b870c2619f1083ed"},
        "ip":                     {"S": "11.11.111.11"},
        "landing_page__type":     {"S": "PageMain"},
        "location__city":         {"S": "Scituate"},
        "location__country":      {"S": "United States"},
        "location__country_code": {"S": "US"},
        "location__region":       {"S": "MA"},
        "location__zip":          {"S": "02066"},
        "origin_url":             {"S": "https://www.bing.com/"},
        "session":                {"S": "b4d58fd18"},
        "source":                 {"S": "bing"},
        "user_agent__browser":    {"S": "Chrome"},
        "user_device":            {"S": "t"}
    }
}

正如我们所见,每一行数据都是嵌套的。我想创建一个 * .csv文件作为它的结果。有什么建议我可以解析它吗?目前我有一个 UDF(自定义函数)来将 dict 本身从 DynamoDB 转换为常规视图。例如,我如何从每一行中提取数据并将该函数应用于它。

谢谢

标签: amazon-web-servicesapache-sparkpysparkamazon-dynamodb

解决方案


这个想法(从这个答案select中采用)是递归地收集列表中的所有列名,然后在语句中使用这个列表:

from pyspark.sql import functions as F
from pyspark.sql import types as T

df = spark.read.option("multiLine", "true").json(<filename>)

def flatten(schema, prefix=None):
    for field in schema.fields:
        if prefix is None:
            colName = field.name
        else:
            colName = prefix + "." + field.name
        if isinstance(field.dataType,T.StructType):
            yield from flatten(field.dataType, colName)
        else:
            yield F.col(colName).alias(colName.replace(".", "_"))
    
df.select(list(flatten(df.schema))).show()

输出:

+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|Item_accept_languages_M_en_N|Item_accept_languages_M_en-US_N|Item_accept_mimetypes_M_*/*_N|Item_accept_mimetypes_M_image/*_N|Item_accept_mimetypes_M_image/apng_N|Item_accept_mimetypes_M_image/webp_N|           Item_id_S|   Item_ip_S|Item_landing_page__type_S|Item_location__city_S|Item_location__country_S|Item_location__country_code_S|Item_location__region_S|Item_location__zip_S|   Item_origin_url_S|Item_session_S|Item_source_S|Item_user_agent__browser_S|Item_user_device_S|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|                         0.9|                              1|                          0.8|                                1|                                   1|                                   1|5cddbd53b870c2619...|11.11.111.11|                 PageMain|             Scituate|           United States|                           US|                     MA|               02066|https://www.bing....|     b4d58fd18|         bing|                    Chrome|                 t|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+

然后可以将此数据框保存为平面 csv。


推荐阅读