首页 > 解决方案 > 根据 pyspark 中一列的变化增加每个分区中的值

问题描述

我想为 PySpark DataFrame 中的每个分区创建一个新列(数字),当列年份发生更改时,该列会递增。

原始数据:

name period year 
A    1      2010
A    1      2010
A    1      2011
A    1      2013
B    1      2018
B    1      2019
C    2      2018
C    2      2018
C    2      2019

预期输出:

name period year  number
A    1      2010  1
A    1      2010  1
A    1      2011  2
A    1      2013  3
B    1      2018  1
B    1      2019  2
C    2      2018  1
C    2      2018  1
C    2      2019  2

标签: pythonpysparkapache-spark-sql

解决方案


创建您提供的示例数据框:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

data = [{"name":'A', "period":1, "year":2010},
        {"name":'A', "period":1, "year":2010},
        {"name":'A', "period":1, "year":2011},
        {"name":'A', "period":1, "year":2013},
        {"name":'B', "period":1, "year":2018},
        {"name":'B', "period":1, "year":2019},
        {"name":'C', "period":2, "year":2018},
        {"name":'C', "period":2, "year":2018},
        {"name":'C', "period":2, "year":2019}]

df = spark.createDataFrame(data)

使用窗口函数对数据帧进行分区,然后根据该分区进行dense_rank:

window = (Window.partitionBy('name').orderBy(F.col('year').asc()))

df = df.withColumn('number', F.dense_rank().over(window)).orderBy("name", "year")

结果:

在此处输入图像描述


推荐阅读