首页 > 解决方案 > Pyspark 根据列值复制行

问题描述

我想根据每行上给定列的值复制我的 DataFrame 中的所有行,然后索引每个新行。假设我有:

Column A Column B
T1       3
T2       2

我希望结果是:

Column A Column B Index
T1       3        1
T1       3        2
T1       3        3
T2       2        1
T2       2        2

我能够使用固定值进行类似的操作,但不能使用列上的信息。我当前的固定值工作代码是:

idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))

我试图改变:

lit(i) for i in range(1, 10) 

lit(i) for i in range(1, df['Column B'])

并将其添加到我的 array() 函数中:

df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))

但它不起作用(TypeError: 'Column' 对象不能被解释为整数)。

我应该如何实现这个?

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


不幸的是,您不能像这样遍历 Column。您始终可以使用 a ,但如果您使用的是 Spark 2.1 或更高版本udf,我确实有一个非 udf hack解决方案应该适合您。

诀窍是利用pyspark.sql.functions.posexplode()来获取索引值。我们通过重复逗号Column B时间创建一个字符串来做到这一点。然后我们用逗号分割这个字符串,并用它posexplode来获取索引。

df.createOrReplaceTempView("df")  # first register the DataFrame as a temp table

query = 'SELECT '\
    '`Column A`,'\
    '`Column B`,'\
    'pos AS Index '\
    'FROM ( '\
        'SELECT DISTINCT '\
        '`Column A`,'\
        '`Column B`,'\
        'posexplode(split(repeat(",", `Column B`), ",")) '\
        'FROM df) AS a '\
    'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#|      T1|       3|    1|
#|      T1|       3|    2|
#|      T1|       3|    3|
#|      T2|       2|    1|
#|      T2|       2|    2|
#+--------+--------+-----+

注意:您需要将列名包含在反引号中,因为它们中有空格,如本文所述:如何在 Spark SQL 中表达名称中包含空格的列


推荐阅读