amazon-web-services - PySpark 读取 DynamoDB 格式的 json
问题描述
我不是火花专业人士,所以请寻求帮助。
我使用内置服务从 DynamoDB 表迁移到 S3。它以 * .json格式保存文件。假设下面我们有一个行示例(每行数据都是嵌套在键“Item”下的字典)。
{
"Item": {
"accept_languages": {
"M": {
"en": {"N": "0.9"},
"en-US": {"N": "1"}
}
},
"accept_mimetypes": {
"M": {
"*/*": {"N": "0.8"},
"image/*": {"N": "1"},
"image/apng": {"N": "1"},
"image/webp": {"N": "1"}
}
},
"id": {"S": "5cddbd53b870c2619f1083ed"},
"ip": {"S": "11.11.111.11"},
"landing_page__type": {"S": "PageMain"},
"location__city": {"S": "Scituate"},
"location__country": {"S": "United States"},
"location__country_code": {"S": "US"},
"location__region": {"S": "MA"},
"location__zip": {"S": "02066"},
"origin_url": {"S": "https://www.bing.com/"},
"session": {"S": "b4d58fd18"},
"source": {"S": "bing"},
"user_agent__browser": {"S": "Chrome"},
"user_device": {"S": "t"}
}
}
正如我们所见,每一行数据都是嵌套的。我想创建一个 * .csv文件作为它的结果。有什么建议我可以解析它吗?目前我有一个 UDF(自定义函数)来将 dict 本身从 DynamoDB 转换为常规视图。例如,我如何从每一行中提取数据并将该函数应用于它。
谢谢
解决方案
这个想法(从这个答案select
中采用)是递归地收集列表中的所有列名,然后在语句中使用这个列表:
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.read.option("multiLine", "true").json(<filename>)
def flatten(schema, prefix=None):
for field in schema.fields:
if prefix is None:
colName = field.name
else:
colName = prefix + "." + field.name
if isinstance(field.dataType,T.StructType):
yield from flatten(field.dataType, colName)
else:
yield F.col(colName).alias(colName.replace(".", "_"))
df.select(list(flatten(df.schema))).show()
输出:
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|Item_accept_languages_M_en_N|Item_accept_languages_M_en-US_N|Item_accept_mimetypes_M_*/*_N|Item_accept_mimetypes_M_image/*_N|Item_accept_mimetypes_M_image/apng_N|Item_accept_mimetypes_M_image/webp_N| Item_id_S| Item_ip_S|Item_landing_page__type_S|Item_location__city_S|Item_location__country_S|Item_location__country_code_S|Item_location__region_S|Item_location__zip_S| Item_origin_url_S|Item_session_S|Item_source_S|Item_user_agent__browser_S|Item_user_device_S|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
| 0.9| 1| 0.8| 1| 1| 1|5cddbd53b870c2619...|11.11.111.11| PageMain| Scituate| United States| US| MA| 02066|https://www.bing....| b4d58fd18| bing| Chrome| t|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
然后可以将此数据框保存为平面 csv。
推荐阅读
- scala - 如何在 Scala、xtract 库中使用重复标签解析 XML?
- python - 如何从元组中选择最多 3 个值?
- r - 通过转换为低的字段连接两个远程表
- php - 服务器更改后 PHP opendir/get_files 不起作用
- ruby - Cocoapods Traceback 在 41 个 gem(s) (Gem::MissingSpecError) 中找不到 'json' (>= 1.5.1)
- python - 如何按升序显示 Y 上的值,但保存它们的索引。Matplotlib
- java - 在不知道密钥但知道明文将包含的单词之一的情况下解密移位密码的方法-Java
- c - 带哨兵的链表
- javascript - 在 for() 循环中编辑变量的值
- php - wp_nav_menu 如何将整个子菜单包装在 div 中