首页 > 解决方案 > 展平 Json 键,Pyspark 中的值

问题描述

在具有 2 列和 2 条记录的表中:

记录 1:第 1 列 - my_col 值为:{"XXX": ["123","456"],"YYY": ["246","135"]}第 2 列 - ID 为A123

记录 2:第 1 列 - my_col 值为:{"XXX": ["123","456"],"YYY": ["246","135"], "ZZZ":["333","444"]}和第 2 列 - ID 为B222

需要使用 pyspark 解析/展平

期待 :

钥匙 价值 ID
XXX 123 A123
XXX 456 A123
年年 246 A123
年年 135 A123
ZZZ 333 B222
ZZZ 444 B222

标签: pysparkapache-spark-sql

解决方案


如果您的列是字符串,您可以使用from_jsonandcustom_schema将其转换为 a MapType,然后再将其explode提取为所需的结果。我假设您的初始列已命名my_col,并且您的数据位于名为input_df.

一个例子如下所示

方法一:使用 pyspark api

from pyspark.sql import functions as F
from pyspark.sql import types as T

custom_schema = T.MapType(T.StringType(),T.ArrayType(T.StringType()))

output_df = (
    input_df.select(
        F.from_json(F.col('my_col'),custom_schema).alias('my_col_json')
    )
    .select(F.explode('my_col_json'))
    .select(
        F.col('key'),
        F.explode('value')
    )
)

方法二:使用 spark sql

# Step 1 : Create a temporary view that may be queried
input_df.createOrReplaceTempView("input_df")
# Step 2: Run the following sql on your spark session
output_df = sparkSession.sql("""
SELECT
    key,
    EXPLODE(value)
FROM (
    SELECT
        EXPLODE(from_json(my_col,"MAP<STRING,ARRAY<STRING>>"))
    FROM
        input_df
) t
""")

对于 json 列

如果已经 json

from pyspark.sql import functions as F

output_df = (
    input_df.select(F.explode('my_col_json'))
    .select(
        F.col('key'),
        F.explode('value')
    )
)

或者

# Step 1 : Create a temporary view that may be queried
input_df.createOrReplaceTempView("input_df")
# Step 2: Run the following sql on your spark session
output_df = sparkSession.sql("""
SELECT
    key,
    EXPLODE(value)
FROM (
    SELECT
        EXPLODE(my_col)
    FROM
        input_df
) t
""")

让我知道这是否适合您。


推荐阅读