首页 > 解决方案 > 如何有效地加载和处理包含不同的、不断发展的模式的 JSON 文件

问题描述

我有一个复杂的数据问题,为了论证,不能修改。这是来自数据库转储的虚拟示例 JSON 文件:

{"payload": {"table": "Table1", "data": {"colA": 1, "colB": 2}}}
{"payload": {"table": "Table2", "data": {"colA": 1, "colC": 2}}}

所以我在目录中的每个文件中都有来自多个表的数据,每个表都有自己的模式。随着在上游添加列,架构会随着时间而改变,因此我们无法定义静态架构并依赖于架构推断。

这是我当前的工作流程(高级):

示例代码:

import pyspark.sql.functions as F

df = spark.read.text(directory)
with_table_df = (
    df
    .withColumn("table", F.get_json_object('value', '$.payload.table'))
    .withColumn("json_payload_data", F.get_json_object('value', '$.payload.data'))
)
unique_tables = with_table_df.select('table').distinct().rdd.map(lambda r: r[0]).collect()

for table in unique_tables:
    filtered_df = with_table_df.filter(f"table = '{table}'")
    table_schema = spark.read.json(filtered_df.rdd.map(lambda row: row.json_payload_data)).schema

    changes_df = (
        filtered_df
        .withColumn('payload_data', F.from_json('json_payload_data', table_schema))
        .select('payload_data.*')
    )

    # do some validation
    if valid:
        changes_df.write.mode("append").option("mergeSchema", "true").saveAsTable(target_table)

我的问题是我无法加载目录,spark.read.json()因为它会将超集模式应用于所有记录,并且我将无法确定哪些列是Table1列,哪些是Table2列。现在我作为文本加载,提取关键 JSON 元素 ( payload.table),然后仅在我有相同模式的记录时解析为 JSON。这将起作用,但会给驱动程序节点带来很多负载。

但我不认为过滤和迭代 Dataframe 行是一个好方法。我想以某种方式利用将验证/选择逻辑映射到执行程序节点,但由于使用创建 JSON 模式的方式(无法序列化到驱动程序节点)foreachPartition,我无法做到这一点。spark.read.json()

我怎样才能重新设计它以更适合 Spark 架构?

更新:

我希望修改数据创建过程,以便 JSON 文件按表分区,这样我就可以简单地spark.read.json(table_path)为每个唯一路径

标签: jsonapache-sparkpysparkapache-spark-sql

解决方案


您的架构没有什么不同。你有一个"payload"带有"table"字符串和"data"映射的结构

如果您对如何定义 Schema 感到困惑"data",请参阅MapType

data_schema = MapType(StringType(), IntegerType(), False)
payload_schema = StructType(
    [
        StructField("table", StringType(), False),
        StructField("data", data_schema, False),
    ]
)
schema = StructType(
    [
        StructField("payload", payload_schema, False),
    ]
)

推荐阅读