首页 > 解决方案 > PySpark 将 JSON 字符串分解为多列

问题描述

我有一个带有字符串数据类型列的数据框。该字符串表示一个返回 json 的 api 请求。

df = spark.createDataFrame([
           ("[{original={ranking=1.0, input=top3}, response=[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]",1)], 
           "col1:string, col2:int")
df.show()

这会生成一个数据框,例如:

+--------------------+----+
|                col1|col2|
+--------------------+----+
|[{original={ranki...|   1|
+--------------------+----+

输出我想要 col2 并从响应中添加两列。Col3 将捕获玩家姓名,由 to= 指示,col 4 将其位置由 position= 指示。以及数据框现在将有三行,因为有三个玩家。例子:

+----+------+-------+
|col2|  col3|   col4|
+----+------+-------+
|   1|   Sam|  guard|
|   1|  John| center|
|   1|Andrew|forward|
+----+------+-------+

我读过我可以利用类似的东西:

df.withColumn("col3",explode(from_json("col1")))

但是,鉴于我想要两列而不是一列并且需要架构,我不确定如何展开。

请注意,我可以使用 json_dumps 修改响应以仅返回字符串的响应片段或...

[{to=Sam, position=guard}, {to=John, position=center}, {to=Andrew, position=forward}]}]

标签: jsonpysparkexplode

解决方案


如果您像提到的那样简化输出,您可以定义一个简单的 JSON 模式并将 JSON 字符串转换为StructType并读取每个字段

输入

df = spark.createDataFrame([("[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]",1)], "col1:string, col2:int")

# +-----------------------------------------------------------------------------------------------------------------+----+
# |col1                                                                                                             |col2|
# +-----------------------------------------------------------------------------------------------------------------+----+
# |[{'to': 'Sam', 'position': 'guard'},{'to': 'John', 'position': 'center'},{'to': 'Andrew', 'position': 'forward'}]|1   |
# +-----------------------------------------------------------------------------------------------------------------+----+

这就是转变

from pyspark.sql import functions as F
from pyspark.sql import types as T

schema = T.ArrayType(T.StructType([
    T.StructField('to', T.StringType()),
    T.StructField('position', T.StringType())
]))

(df
    .withColumn('temp', F.explode(F.from_json('col1', schema=schema)))
    .select(
        F.col('col2'),
        F.col('temp.to').alias('col3'),
        F.col('temp.position').alias('col4'),
    )
    .show()
)

# Output
# +----+------+-------+
# |col2|  col3|   col4|
# +----+------+-------+
# |   1|   Sam|  guard|
# |   1|  John| center|
# |   1|Andrew|forward|
# +----+------+-------+

推荐阅读