首页 > 解决方案 > PySpark 在值变化时计算平均值

问题描述

我有一个 df 测量燃料水平%和加油状态。加油时值为 1,否则为 0。这是一个示例 df

+-----------------------+---------+---------+
|timestamp              |level %  |refueling|
+-----------------------+---------+---------+
|2020-08-01 00:09:41    |53.0     |1        |
|2020-08-01 00:14:41    |52.0     |0        |
|2020-08-01 02:19:41    |51.0     |0        |
|2020-08-01 04:24:41    |50.0     |0        |
|2020-08-01 06:29:41    |49.0     |0        |
|2020-08-01 08:44:41    |83.0     |1        |
|2020-08-01 10:49:41    |82.0     |0        |
|2020-08-01 12:54:41    |81.0     |0        |
|2020-08-01 14:59:41    |80.0     |0        |
|2020-08-01 16:04:41    |79.0     |0        |
|2020-08-01 18:09:41    |92.0     |1        |
|2020-08-01 20:14:41    |91.0     |0        |
|2020-08-01 22:19:41    |90.0     |0        |
|2020-08-02 00:24:41    |89.0     |0        |
+-----------------------+---------+---------+

我想计算值的平均值,level %直到加油发生(refueling列值为 1),将计算出的平均值写为新列,并通过考虑下一个重复相同的过程以下是我想要达到的输出

+-----------------------+---------+---------+---------+
|timestamp              |level %  |refueling|average  |
+-----------------------+---------+---------+---------+
|2020-08-01 00:09:41    |53.0     |1        |45.9     |--> from previous calculations
|2020-08-01 00:14:41    |52.0     |0        |null     |
|2020-08-01 02:19:41    |51.0     |0        |null     |
|2020-08-01 04:24:41    |50.0     |0        |null     |
|2020-08-01 06:29:41    |49.0     |0        |null     |
|2020-08-01 08:44:41    |83.0     |1        |51.0     |--> (53+52+51+50+49)/5
|2020-08-01 10:49:41    |82.0     |0        |null     |
|2020-08-01 12:54:41    |81.0     |0        |null     |
|2020-08-01 14:59:41    |80.0     |0        |null     |
|2020-08-01 16:04:41    |79.0     |0        |null     |
|2020-08-01 18:09:41    |92.0     |1        |81.0     |--> (83+82+81+80+79)/5
|2020-08-01 20:14:41    |91.0     |0        |null     |
|2020-08-01 22:19:41    |90.0     |0        |null     |
|2020-08-02 00:24:41    |89.0     |0        |null     |
+-----------------------+---------+---------+---------+

我尝试window了滚动平均功能,

w = Window.partitionBy(F.col("refueling")).orderBy(F.col("timestamp").asc())
df_diesel_rolling_ave = measurement_df_diesel.withColumn('average', F.avg("value_dbl").over(w))

但它为每一行生成连续的平均值,而不是在 every 之后开始新的平均值计算refueling 1。这样做的方法是什么?

标签: pythonapache-sparkpyspark

解决方案


这是我的尝试,您可以将初始值从 0 更改为您想要的。

from pyspark.sql import Window

w1 = Window.orderBy('timestamp')
w2 = Window.partitionBy('group')

r = df.withColumn('group', sum('refueling').over(w1)) \
  .withColumn('avg', when(expr('refueling == 1'), lag(avg('level %').over(w2), 1, 0).over(w1)))

r.show()

+-------------------+-------+---------+-----+----+
|          timestamp|level %|refueling|group| avg|
+-------------------+-------+---------+-----+----+
|2020-08-01 00:09:41|   53.0|        1|    1| 0.0|
|2020-08-01 00:14:41|   52.0|        0|    1|null|
|2020-08-01 02:19:41|   51.0|        0|    1|null|
|2020-08-01 04:24:41|   50.0|        0|    1|null|
|2020-08-01 06:29:41|   49.0|        0|    1|null|
|2020-08-01 08:44:41|   83.0|        1|    2|51.0|
|2020-08-01 10:49:41|   82.0|        0|    2|null|
|2020-08-01 12:54:41|   81.0|        0|    2|null|
|2020-08-01 14:59:41|   80.0|        0|    2|null|
|2020-08-01 16:04:41|   79.0|        0|    2|null|
|2020-08-01 18:09:41|   92.0|        1|    3|81.0|
|2020-08-01 20:14:41|   91.0|        0|    3|null|
|2020-08-01 22:19:41|   90.0|        0|    3|null|
|2020-08-02 00:24:41|   89.0|        0|    3|null|
+-------------------+-------+---------+-----+----+

推荐阅读