首页 > 解决方案 > 分解多个列,在 PySpark 中保留列名

问题描述

我有以下 PySpark 数据框(first_df):

ID
0 [“波斯”,“斯芬克斯”] [] [“闹剧”]
1 [] [“斗牛犬”] [“columbaves”,“gruiformes”]
2 [“布娃娃”] [“拉布拉多犬”] []

我想一次分解多个列,将旧列名保留在新列中,例如:

ID 动物 动物类型
0 波斯
0 斯芬克斯
0 伤痕累累
1 斗牛犬
1 哥伦布
1 鱼形目
2 布娃娃
2 拉布拉多犬

到目前为止,我目前的解决方案如下:

animal_types = ['cat', 'dog', 'bird']
df = spark.createDataFrame([], schema=StructType([
    StructField('id', StringType()),
    StructField('animal', StringType()),
    StructField('animal_type', StringType())
]))

for animal_type in animal_types:
  df = first_df \
    .select('id', animal_type) \
    .withColumn('animal', F.explode(animal_type)) \
    .drop(animal_type) \
    .withColumn('animal_type', F.lit(animal_type.upper())) \
    .union(df)

但是我发现它的效率很低,尤其是在集群中工作时。

有没有更好的火花方式来实现这一点?

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


您可以取消旋转和分解数组:

df2 = df.selectExpr(
    'id', 
    'stack(' + str(len(df.columns[1:])) + ', ' + ', '.join(["%s, '%s'" % (col,col) for col in df.columns[1:]]) + ') as (animal, animal_type)'
).withColumn(
    'animal', 
    F.explode('animal')
)

df2.show()
+---+----------+-----------+
| id|    animal|animal_type|
+---+----------+-----------+
|  0| strisores|       bird|
|  0|    persan|        cat|
|  0|    sphynx|        cat|
|  1|columbaves|       bird|
|  1|gruiformes|       bird|
|  1|   bulldog|        dog|
|  2|   ragdoll|        cat|
|  2|  labrador|        dog|
+---+----------+-----------+

推荐阅读