首页 > 解决方案 > Pyspark Groupby 创建列

问题描述

在 Pyspark 中,我需要分组ID并创建四个新列(min、max、std、ave)。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w = (Window.orderBy(F.col("Date").cast('long')).rowsBetween(-4, 0))

df = df.groupby("ID") \
.withColumn('hr1_ave', F.avg("rpm").over(w))\
.withColumn('hr1_std', F.stddev("rpm").over(w))\
.withColumn('hr1_min', F.min("rpm").over(w))\
.withColumn('hr1_max', F.max("rpm").over(w))

我也试过:

df.groupby("ID").select('rpm', f.avg('rpm').over(w).alias('hr1_avg'))

但是,这两个命令我都收到此错误:

AttributeError: 'GroupedData' object has no attribute 'withColumn'

有没有办法为每个列创建一个新列ID并创建这些列,或者我的语法不正确?

谢谢。

标签: pythonapache-sparkpysparkgroup-by

解决方案


您需要将“分组”列移动ID到窗口定义中作为partitionBy. 那么groupBy就没有必要了:

编码

w = Window.partitionBy("ID").orderBy(F.col("Date").cast('long')).rowsBetween(-4, 0)

df \
.withColumn('hr1_ave', F.avg("rpm").over(w))\
.withColumn('hr1_std', F.stddev("rpm").over(w))\
.withColumn('hr1_min', F.min("rpm").over(w))\
.withColumn('hr1_max', F.max("rpm").over(w)) \
.show()

应该打印您的预期结果。


推荐阅读