首页 > 解决方案 > 使用 Python 用另一个嵌套 Json 更新嵌套 Json

问题描述

例如,我有一套完整的嵌套 JSON,我需要使用另一个嵌套 JSON 中的最新值来更新这个 JSON。

谁能帮我这个?

我想在 Pyspark 中实现它。

全套 Json 看起来像这样:

{
    "email": "abctest@xxx.com", 
    "firstName": "name01", 
    "id": 6304,
    "surname": "Optional",
    "layer01": {
        "key1": "value1", 
        "key2": "value2", 
        "key3": "value3", 
        "key4": "value4", 
        "layer02": {
            "key1": "value1", 
            "key2": "value2"
        }, 
        "layer03": [
            {
                "inner_key01": "inner value01"
            }, 
            {
                "inner_key02": "inner_value02"
            }
        ]
    }, 
    "surname": "Required only$uid"
}

LatestJson 看起来像这样:

{
    "email": "test@xxx.com", 
    "firstName": "name01", 
    "surname": "Optional",
    "id": 6304,
    "layer01": {
        "key1": "value1", 
        "key2": "value2", 
        "key3": "value3", 
        "key4": "value4", 
        "layer02": {
            "key1": "value1_changedData", 
            "key2": "value2"
        }, 
        "layer03": [
            {
                "inner_key01": "inner value01"
            }, 
            {
                "inner_key02": "inner_value02"
            }
        ]
    }, 
    "surname": "Required only$uid"
}

在上面,id=6304我们已经收到了layer01.layer02.key1和文件的更新emailaddress

所以我需要将这些值更新为完整的 JSON,请帮助我。

标签: python-3.xapache-sparkpysparkpyspark-dataframes

解决方案


您可以将 2 个 JSON 文件加载到 Spark 数据帧中,然后left_join从最新的 JSON 数据中获取更新:

from pyspark.sql import functions as F

full_json_df = spark.read.json(full_json_path, multiLine=True)
latest_json_df = spark.read.json(latest_json_path, multiLine=True)

updated_df = full_json_df.alias("full").join(
    latest_json_df.alias("latest"),
    F.col("full.id") == F.col("latest.id"),
    "left"
).select(
    F.col("full.id"),
    *[
        F.when(F.col("latest.id").isNotNull(), F.col(f"latest.{c}")).otherwise(F.col(f"full.{c}")).alias(c)
        for c in full_json_df.columns if c != 'id'
    ]
)

updated_df.show(truncate=False)

#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|id  |email       |firstName|layer01                                                                                              |surname |
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|6304|test@xxx.com|name01   |[value1, value2, value3, value4, [value1_changedData, value2], [[inner value01,], [, inner_value02]]]|Optional|
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+

更新:

如果架构在完整JSON和最新JSON 之间发生变化,您可以将 2 个文件加载到同一个数据框中(这样架构正在合并),然后对每个文件进行重复数据删除id

from pyspark.sql import Window
from pyspark.sql import functions as F

merged_json_df = spark.read.json("/path/to/{full_json.json,latest_json.json}", multiLine=True)

# order priority: latest file then full
w = Window.partitionBy(F.col("id")).orderBy(F.when(F.input_file_name().like('%latest%'), 0).otherwise(1))

updated_df = merged_json_df.withColumn("rn", F.row_number().over(w))\
    .filter("rn = 1")\
    .drop("rn")

updated_df.show(truncate=False)

推荐阅读