首页 > 解决方案 > 如何在 Spark 数据框中添加具有序列值的列?

问题描述

如何从 PySpark 数据框中的特定数字添加具有序列值的列?

当前数据集:

Col1    Col2    Flag
Val1    Val2    F
Val3    Val4    T

但我希望数据集是这样的:

Col1    Col2    Flag    New_Col
Val1    Val2    F       11F
Val3    Val4    T       12T

我在 Python 中使用下面的代码。

from pyspark.sql import functions as F
from pyspark.sql import types as T

seq = 10

def fn_increment_id(flag):
    global seq
    seq += 1
    return str(seq) + flag

if __name__ == "__main__":
    df = spark.loadFromMapRDB("path/to/table")
    my_udf = F.UserDefinedFunction(fn_increment_id, T.StringType())
    df = df.withColumn("New_Col", my_udf("Flag"))
    print(df.show(10))

但是,我最终得到了结果:

Received Dataset:

Col1    Col2    Flag    New_Col
Val1    Val2    F       11F
Val3    Val4    T       11T

因此,它为所有行增加一次。如何增加每一行?提前致谢。

标签: pythonapache-sparkpysparkapache-spark-sqlapache-spark-dataset

解决方案


可以使用 . 添加具有顺序值的列Window。只要数据框不太大,这很好,对于较大的数据框,您应该考虑partitionBy在窗口上使用,但值不会是连续的。

下面的代码为每一行创建序列号,将 10 添加到它,然后将值与Flag列连接以创建一个新列。这里的行排序,Col1但可以使用任何列。

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, concat

w = Window().orderBy("Col1")
df = df.withColumn("New_Col", concat(row_number().over(w) + 10, col(Flag)))

推荐阅读