python - PySpark 在值变化时计算平均值
问题描述
我有一个 df 测量燃料水平%和加油状态。加油时值为 1,否则为 0。这是一个示例 df
+-----------------------+---------+---------+
|timestamp |level % |refueling|
+-----------------------+---------+---------+
|2020-08-01 00:09:41 |53.0 |1 |
|2020-08-01 00:14:41 |52.0 |0 |
|2020-08-01 02:19:41 |51.0 |0 |
|2020-08-01 04:24:41 |50.0 |0 |
|2020-08-01 06:29:41 |49.0 |0 |
|2020-08-01 08:44:41 |83.0 |1 |
|2020-08-01 10:49:41 |82.0 |0 |
|2020-08-01 12:54:41 |81.0 |0 |
|2020-08-01 14:59:41 |80.0 |0 |
|2020-08-01 16:04:41 |79.0 |0 |
|2020-08-01 18:09:41 |92.0 |1 |
|2020-08-01 20:14:41 |91.0 |0 |
|2020-08-01 22:19:41 |90.0 |0 |
|2020-08-02 00:24:41 |89.0 |0 |
+-----------------------+---------+---------+
我想计算值的平均值,level %
直到加油发生(refueling
列值为 1),将计算出的平均值写为新列,并通过考虑下一个重复相同的过程以下是我想要达到的输出
+-----------------------+---------+---------+---------+
|timestamp |level % |refueling|average |
+-----------------------+---------+---------+---------+
|2020-08-01 00:09:41 |53.0 |1 |45.9 |--> from previous calculations
|2020-08-01 00:14:41 |52.0 |0 |null |
|2020-08-01 02:19:41 |51.0 |0 |null |
|2020-08-01 04:24:41 |50.0 |0 |null |
|2020-08-01 06:29:41 |49.0 |0 |null |
|2020-08-01 08:44:41 |83.0 |1 |51.0 |--> (53+52+51+50+49)/5
|2020-08-01 10:49:41 |82.0 |0 |null |
|2020-08-01 12:54:41 |81.0 |0 |null |
|2020-08-01 14:59:41 |80.0 |0 |null |
|2020-08-01 16:04:41 |79.0 |0 |null |
|2020-08-01 18:09:41 |92.0 |1 |81.0 |--> (83+82+81+80+79)/5
|2020-08-01 20:14:41 |91.0 |0 |null |
|2020-08-01 22:19:41 |90.0 |0 |null |
|2020-08-02 00:24:41 |89.0 |0 |null |
+-----------------------+---------+---------+---------+
我尝试window
了滚动平均功能,
w = Window.partitionBy(F.col("refueling")).orderBy(F.col("timestamp").asc())
df_diesel_rolling_ave = measurement_df_diesel.withColumn('average', F.avg("value_dbl").over(w))
但它为每一行生成连续的平均值,而不是在 every 之后开始新的平均值计算refueling 1
。这样做的方法是什么?
解决方案
这是我的尝试,您可以将初始值从 0 更改为您想要的。
from pyspark.sql import Window
w1 = Window.orderBy('timestamp')
w2 = Window.partitionBy('group')
r = df.withColumn('group', sum('refueling').over(w1)) \
.withColumn('avg', when(expr('refueling == 1'), lag(avg('level %').over(w2), 1, 0).over(w1)))
r.show()
+-------------------+-------+---------+-----+----+
| timestamp|level %|refueling|group| avg|
+-------------------+-------+---------+-----+----+
|2020-08-01 00:09:41| 53.0| 1| 1| 0.0|
|2020-08-01 00:14:41| 52.0| 0| 1|null|
|2020-08-01 02:19:41| 51.0| 0| 1|null|
|2020-08-01 04:24:41| 50.0| 0| 1|null|
|2020-08-01 06:29:41| 49.0| 0| 1|null|
|2020-08-01 08:44:41| 83.0| 1| 2|51.0|
|2020-08-01 10:49:41| 82.0| 0| 2|null|
|2020-08-01 12:54:41| 81.0| 0| 2|null|
|2020-08-01 14:59:41| 80.0| 0| 2|null|
|2020-08-01 16:04:41| 79.0| 0| 2|null|
|2020-08-01 18:09:41| 92.0| 1| 3|81.0|
|2020-08-01 20:14:41| 91.0| 0| 3|null|
|2020-08-01 22:19:41| 90.0| 0| 3|null|
|2020-08-02 00:24:41| 89.0| 0| 3|null|
+-------------------+-------+---------+-----+----+
推荐阅读
- sql - 获取多个表的最大值
- c# - 如何调用向导添加新的项目模板
- java - 绑定变量上的 SQL 连接
- angular - 如何打开 Angular 服务器?
- php - Libresolv 对 Alpine linux 的依赖
- javascript - 将脚本标签插入 Angular 4 HTML 组件
- json - Powershell中ConvertFrom-Json后访问Json数据的问题
- javascript - 如何在浏览器中对文本进行分页?
- elasticsearch - Metricbeat 发送进程指标,即使它被禁用
- c# - C# 使用 MySqlDataAdapter.Update(dataTable) 将记录更新和插入 MySQL 数据库