首页 > 解决方案 > PySpark 将列拆分为具有应用架构的新数据框

问题描述

如何通过逗号将字符串列拆分为具有应用架构的新数据框?

例如,这是一个 pyspark DataFrame,它有两列 (idvalue)

df = sc.parallelize([(1, "200,201,hello"), (2, "23,24,hi")]).toDF(["id", "value"])

我想获取该value列并将其拆分为一个新的 DataFrame 并应用以下架构:

from pyspark.sql.types import IntegerType, StringType, StructField, StructType

message_schema = StructType(
    [
        StructField("id", IntegerType()),
        StructField("value", IntegerType()),
        StructField("message", StringType()),
    ]
)

可行的是:

df_split = (
    df.select(split(df.value, ",\s*"))
    .rdd.flatMap(lambda x: x)
    .toDF()
)
df_split.show()

但我仍然需要根据架构转换和重命名列:

df_split.select(
    [
        col(_name).cast(_schema.dataType).alias(_schema.name)
        for _name, _schema in zip(df_split.columns, message_schema)
    ]
).show()

与预期的结果:

+---+-----+-------+
| id|value|message|
+---+-----+-------+
|200|  201|  hello|
| 23|   24|     hi|
+---+-----+-------+

标签: python-3.xapache-sparkpysparkapache-spark-sql

解决方案


对于 Spark 3+,有一个函数from_csv可用于使用message_schemaDDL 格式的模式解析逗号分隔的字符串:

import pyspark.sql.functions as F

df1 = df.withColumn(
    "message",
    F.from_csv("value", message_schema.simpleString())
).select("message.*")

df1.show()
#+---+-----+-------+
#| id|value|message|
#+---+-----+-------+
#|200|  201|  hello|
#| 23|   24|     hi|
#+---+-----+-------+

df1.printSchema()
#root
# |-- id: integer (nullable = true)
# |-- value: integer (nullable = true)
# |-- message: string (nullable = true)

推荐阅读