首页 > 解决方案 > Pyspark从列表中添加一列重复值

问题描述

我有一个 pyspark 数据框,并且想要添加一个列,该列以重复的方式从列表中添加值。如果这只是 python,我可能会使用 itertools 的循环功能。我不知道如何在 pyspark 中执行此操作。

names = ['Julia', 'Tim', 'Zoe']

我的数据框如下所示:

+-----+------+
| id_A| idx_B|
+-----+------+
|    a|     0|       
|    b|     0|    
|    b|     2|       
|    b|     2|       
|    b|     2|       
|    b|     2|      
+-----+------+

我希望它看起来像这样:

+-----+------+--------+
| id_A| idx_B| names  |
+-----+------+--------+
|    a|     0|   Julia|
|    b|     0|     Tim|
|    b|     2|     Zoe|
|    b|     2|   Julia|
|    b|     2|     Tim|
|    b|     2|     Zoe|
+-----+------+--------+

标签: apache-sparkpysparkapache-spark-sql

解决方案


这是一种方法。

1 - 为您的数据框添加一个唯一的增量 ID:

df = spark.createDataFrame(
    df.rdd.zipWithIndex().map(lambda x: Row(*x[0], x[1]))
).toDF("id_A", "idx_B", "id")

df.show()
#+----+-----+---+
#|id_A|idx_B| id|
#+----+-----+---+
#|   a|    0|  0|
#|   b|    0|  1|
#|   b|    2|  2|
#|   b|    2|  3|
#|   b|    2|  4|
#|   b|    2|  5|
#+----+-----+---+

2 - 从名称列表中创建数据框:

names_df = spark.createDataFrame([(idx, name) for idx, name in enumerate(names)], ["name_id", "names"])

3 - 在条件下使用模 3(名称列表的长度)加入:

from pyspark.sql import functions as F

result = df.join(
    names_df,
    F.col("id") % 3 == F.col("name_id")
).orderBy("id").drop("id", "name_id")

result.show()
#+----+-----+-----+
#|id_A|idx_B|names|
#+----+-----+-----+
#|   a|    0|Julia|
#|   b|    0|  Tim|
#|   b|    2|  Zoe|
#|   b|    2|Julia|
#|   b|    2|  Tim|
#|   b|    2|  Zoe|
#+----+-----+-----+

推荐阅读