json - 如何有效地加载和处理包含不同的、不断发展的模式的 JSON 文件
问题描述
我有一个复杂的数据问题,为了论证,不能修改。这是来自数据库转储的虚拟示例 JSON 文件:
{"payload": {"table": "Table1", "data": {"colA": 1, "colB": 2}}}
{"payload": {"table": "Table2", "data": {"colA": 1, "colC": 2}}}
所以我在目录中的每个文件中都有来自多个表的数据,每个表都有自己的模式。随着在上游添加列,架构会随着时间而改变,因此我们无法定义静态架构并依赖于架构推断。
这是我当前的工作流程(高级):
- 将整个 JSON 数据目录加载到一个数据帧中
- 查找这批更改中存在的唯一表
- 对于每个表,将数据框过滤到仅该表数据
- 读取表子集的模式(表 1 为 A+B,表 2 为 A+C)
- 做一些验证
- 将记录与目的地合并
示例代码:
import pyspark.sql.functions as F
df = spark.read.text(directory)
with_table_df = (
df
.withColumn("table", F.get_json_object('value', '$.payload.table'))
.withColumn("json_payload_data", F.get_json_object('value', '$.payload.data'))
)
unique_tables = with_table_df.select('table').distinct().rdd.map(lambda r: r[0]).collect()
for table in unique_tables:
filtered_df = with_table_df.filter(f"table = '{table}'")
table_schema = spark.read.json(filtered_df.rdd.map(lambda row: row.json_payload_data)).schema
changes_df = (
filtered_df
.withColumn('payload_data', F.from_json('json_payload_data', table_schema))
.select('payload_data.*')
)
# do some validation
if valid:
changes_df.write.mode("append").option("mergeSchema", "true").saveAsTable(target_table)
我的问题是我无法加载目录,spark.read.json()
因为它会将超集模式应用于所有记录,并且我将无法确定哪些列是Table1
列,哪些是Table2
列。现在我作为文本加载,提取关键 JSON 元素 ( payload.table
),然后仅在我有相同模式的记录时解析为 JSON。这将起作用,但会给驱动程序节点带来很多负载。
但我不认为过滤和迭代 Dataframe 行是一个好方法。我想以某种方式利用将验证/选择逻辑映射到执行程序节点,但由于使用创建 JSON 模式的方式(无法序列化到驱动程序节点)foreachPartition
,我无法做到这一点。spark.read.json()
我怎样才能重新设计它以更适合 Spark 架构?
更新:
我希望修改数据创建过程,以便 JSON 文件按表分区,这样我就可以简单地spark.read.json(table_path)
为每个唯一路径
解决方案
您的架构没有什么不同。你有一个"payload"
带有"table"
字符串和"data"
映射的结构
如果您对如何定义 Schema 感到困惑"data"
,请参阅MapType
data_schema = MapType(StringType(), IntegerType(), False)
payload_schema = StructType(
[
StructField("table", StringType(), False),
StructField("data", data_schema, False),
]
)
schema = StructType(
[
StructField("payload", payload_schema, False),
]
)
推荐阅读
- angular - Angular 将 API 结果从一个组件发送到不同路由中的另一个组件
- amazon-web-services - 有关 AWS EIP 分配日期和 Boto3 的元数据
- python - Django 表单没有创建新条目
- octave - Matlab二分法
- powershell - Azure BACPAC 导出从 PowerShell 脚本随机失败
- php - 发生数据库错误:您必须使用“set”方法来更新条目
- r - Funs() 和相关错误
- java - 当 JTextArea 为空时,我的代码计算错误的字数
- python - 将简单的 Python 脚本部署到 Google Cloud
- java - Mockito spy 一个接口,模拟它的默认方法,得到 NullPointerException