首页 > 解决方案 > 在火花中展平json字符串

问题描述

我在火花中有以下数据框:

root
 |-- user_id: string (nullable = true)
 |-- payload: string (nullable = true)

其中payload是一个没有固定模式的json字符串,下面是一些示例数据:

{'user_id': '001','payload': '{"country":"US","time":"11111"}'}
{'user_id': '002','payload': '{"message_id":"8936716"}'}
{'user_id': '003','payload': '{"brand":"adidas","when":""}'}

我想用扁平化的有效负载以json格式输出上述数据(基本上只是从有效负载中提取键值对并将它们放入根级别),例如:

{'user_id': '001','country':'US','time':'11111'}
{'user_id': '002','message_id':'8936716'}
{'user_id': '003','brand':'adidas','when':''}

Stackoverflow 说这是对Flatten Nested Spark Dataframe的重复问题,但事实并非如此。这里的区别在于,在我的情况下,有效负载的值只是字符串类型。

标签: jsonapache-sparkpysparkapache-spark-sql

解决方案


您可以将负载 JSON 解析为 amap<string,string>并将其添加user_id到负载中:

import pyspark.sql.functions as F

# input dataframe
df.show(truncate=False)
+-------+-------------------------------+
|user_id|payload                        |
+-------+-------------------------------+
|001    |{"country":"US","time":"11111"}|
|002    |{"message_id":"8936716"}       |
|003    |{"brand":"adidas","when":""}   |
+-------+-------------------------------+

df2 = df.select(
    F.to_json(
        F.map_concat(
            F.create_map(F.lit('user_id'), F.col('user_id')), 
            F.from_json('payload', 'map<string,string>')
        )
    ).alias('out')
)

df2.show(truncate=False)
+-----------------------------------------------+
|out                                            |
+-----------------------------------------------+
|{"user_id":"001","country":"US","time":"11111"}|
|{"user_id":"002","message_id":"8936716"}       |
|{"user_id":"003","brand":"adidas","when":""}   |
+-----------------------------------------------+

要将其写入 JSON 文件,您可以执行以下操作:

df2.coalesce(1).write.text('filepath')

推荐阅读