python - Pyspark Groupby 创建列
问题描述
在 Pyspark 中,我需要分组ID
并创建四个新列(min、max、std、ave)。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = (Window.orderBy(F.col("Date").cast('long')).rowsBetween(-4, 0))
df = df.groupby("ID") \
.withColumn('hr1_ave', F.avg("rpm").over(w))\
.withColumn('hr1_std', F.stddev("rpm").over(w))\
.withColumn('hr1_min', F.min("rpm").over(w))\
.withColumn('hr1_max', F.max("rpm").over(w))
我也试过:
df.groupby("ID").select('rpm', f.avg('rpm').over(w).alias('hr1_avg'))
但是,这两个命令我都收到此错误:
AttributeError: 'GroupedData' object has no attribute 'withColumn'
有没有办法为每个列创建一个新列ID
并创建这些列,或者我的语法不正确?
谢谢。
解决方案
您需要将“分组”列移动ID
到窗口定义中作为partitionBy
. 那么groupBy
就没有必要了:
编码
w = Window.partitionBy("ID").orderBy(F.col("Date").cast('long')).rowsBetween(-4, 0)
df \
.withColumn('hr1_ave', F.avg("rpm").over(w))\
.withColumn('hr1_std', F.stddev("rpm").over(w))\
.withColumn('hr1_min', F.min("rpm").over(w))\
.withColumn('hr1_max', F.max("rpm").over(w)) \
.show()
应该打印您的预期结果。
推荐阅读
- sql - 请解释一下sql中的临时表?
- python - 使用 boto3 将 s3 存储桶中的所有文件从 s3 帐户移动到另一个帐户
- java - ForLoop 中的 BigInteger 和 Java 中的列表
- nginx - 根据upstream的状态码触发openidc认证
- python - 以列表为键创建字典
- neo4j - Neo4J WITH 子句在添加属性时没有给出记录
- javascript - 谷歌表格脚本
- intellij-idea - Intellij 实时模板将变量转换为常量
- python - 使用正则表达式从熊猫数据框python中删除在单个列的所有行中找到的唯一单词
- mysql - 根据另一个数据点是否不为空来更新MYSQL数据点?