pyspark - pyspark中部分数据帧的聚合
问题描述
是否可以对部分数据帧进行聚合?或者是否可以在给定条件下有效地拆分数据帧?
假设我有一个如下所示的数据框:
+-------------+-----------------+-----------+----------------+-----------+
| epoch_ms|ID | state | value 1 | value 2 |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692 | 3.0| 0.239999 | 11.2699 |
|1587497991000| 3489692692 | 2.0| 0.159999 | 21.6999 |
|1587864812000| 3489692692 | 2.0| 0.959999 | 359.649 |
|1587581329000| 3489692692 | 1.0| 1.039999 | 336.209 |
|1587581329000| 3489692692 | 3.0| 1.039999 | 336.299 |
|1587581329000| 3489692692 | 1.0| 2.799999 | 336.209 |
假设一个事件以 state=3 开始并以 state=1 结束,是否有一种有效的方法来按“事件”进行拆分,我希望在这些状态之间包含更小的数据帧,在这种小情况下:
+-------------+-----------------+-----------+----------------+-----------+
| epoch_ms|ID | state | value 1 | value 2 |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692 | 3.0| 0.239999 | 11.2699 |
|1587497991000| 3489692692 | 2.0| 0.159999 | 21.6999 |
|1587864812000| 3489692692 | 2.0| 0.959999 | 359.649 |
|1587581329000| 3489692692 | 1.0| 1.039999 | 336.209 |
和
+-------------+-----------------+-----------+----------------+-----------+
| epoch_ms|ID | state | value 1 | value 2 |
+-------------+-----------------+-----------+----------------+-----------+
|1587581329000| 3489692692 | 3.0| 1.039999 | 336.299 |
|1587581329000| 3489692692 | 1.0| 2.799999 | 336.209 |
我的最终目标是拥有另一个基于开始和结束时期对值进行聚合的数据框,例如:
+-------------+---------------+-------------+--------------+-------------+
| ID |start epoch |end_epoch | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692 |1588119659000 |1587581329000|1.039999 |359.649 |
|3489692692 |1587581329000 |1587581329000|2.799999 |336.299 |
以前,当我不处理太多数据时,我使用 pandas 迭代数据帧并逐行构建新的数据帧,但是,是的,这不是很有效。任何提示我正确方向的提示将不胜感激。
- - - -###更新### - - - - -
我想下面是我正在使用的数据的更好示例:
+-------------+-----------------+-----------+----------------+-----------+
| epoch_ms|ID | state | value 1 | value 2 |
+-------------+-----------------+-----------+----------------+-----------+
|1585766054000| 3489692692 | 3.0| 0.159999 | 7.58996 |
|1585766055000| 3489692692 | 3.0| 0.239999 | 11.2699 |
|1585766058000| 3489692692 | 3.0| 0.135489 | 13.8790 |
|1587497991000| 3489692692 | 2.0| 0.159999 | 21.6999 |
|1587864812000| 3489692692 | 2.0| 0.959999 | 359.649 |
|1587581329000| 3489692692 | 1.0| 1.039999 | 336.209 |
|1587581339000| 3489692692 | 3.0| 1.039999 | 336.299 |
|1587581329000| 3489692692 | 1.0| 2.799999 | 336.209 |
|1588088096000| 3489692670 | 3.0| 2.869564 | 285.963 |
|1588088099000| 3489692670 | 2.0| 0.758753 | 299.578 |
|1588088199000| 3489692670 | 1.0| 3.965424 | 5.89677 |
需要考虑的事项:
- 事件以状态 3 开始,以状态 1 结束
- 状态可以重复,例如状态 3 或 2 可以在开始后多次出现,但事件必须包含它们全部,直到状态 1 出现。
- 状态 1 之后的其他状态可能发生,状态 1 多次或状态 2,但下一个事件直到状态再次为 3 才会开始,状态 1 和 3 之间的任何内容(前一个事件的结束和新事件的开始)都应该被忽略。
- 如果数据帧以 3 以外的状态结束,则应假定最后发生了 3。
- 可以有多个 ID,并且数据帧按 epoch 和 id 排序。
上述示例的结果应类似于:
+-------------+---------------+-------------+--------------+-------------+
| ID |start epoch |end_epoch | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692 |1585766054000 |1587581329000|1.039999 |359.649 |
|3489692692 |1587581339000 |1587581329000|2.799999 |336.299 |
|3489692670 |1588088096000 |1588088199000|3.965424 |299.578 |
解决方案
拆分将是反直觉的,您应该使用pyspark in-built
聚合函数(window + groupBy
)来表达您的逻辑。只要按照您呈现的方式对数据进行排序,代码就可以正常工作(因为无法确定某些行的顺序,因为对于同一状态(第 2,3 行)您有不同的 epoch_ms。逻辑是在 state 上使用incremental sum
条件来找出你的分组start/end
。试试 lmk。
df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#| epoch_ms| ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1588119659000|3489692692| 3.0|0.239999|11.2699|
#|1587497991000|3489692692| 2.0|0.159999|21.6999|
#|1587864812000|3489692692| 2.0|0.959999|359.649|
#|1587581329000|3489692692| 1.0|1.039999|336.209|
#|1587581329000|3489692692| 3.0|1.039999|336.299|
#|1587581329000|3489692692| 1.0|2.799999|336.209|
#+-------------+----------+-----+--------+-------+
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("ID").orderBy(F.lit(1))
w2=Window().partitionBy("ID").orderBy("rowNum")
df.withColumn("rowNum", F.row_number().over(w))\
.withColumn("inc_sum", F.sum(F.when(F.col("state")==3,F.lit(1)).otherwise(F.lit(0))).over(w2))\
.groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
F.max("epoch_ms").alias("start_epoch"),\
F.min("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
F.max("value 2").alias("max_value2")).drop("inc_sum").show()
#+-------+----------+-------------+-------------+----------+----------+
#|inc_sum| ID| start_epoch| end_epoch|max_value1|max_value2|
#+-------+----------+-------------+-------------+----------+----------+
#| 1|3489692692|1588119659000|1587497991000| 1.039999| 359.649|
#| 2|3489692692|1587581329000|1587581329000| 2.799999| 336.299|
#+-------+----------+-------------+-------------+----------+----------+
UPDATE:
试试这个。我使用lag condition!=3 with state=3
条件single out the start of the event
,然后使用incremental sum
它来获取我们的组。
df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#| epoch_ms| ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1585766054000|3489692692| 3.0|0.159999|7.58996|
#|1585766055000|3489692692| 3.0|0.239999|11.2699|
#|1585766058000|3489692692| 3.0|0.135489| 13.879|
#|1587497991000|3489692692| 2.0|0.159999|21.6999|
#|1587864812000|3489692692| 2.0|0.959999|359.649|
#|1587581329000|3489692692| 1.0|1.039999|336.209|
#|1587581339000|3489692692| 3.0|1.039999|336.299|
#|1587581329000|3489692692| 1.0|2.799999|336.209|
#|1588088096000|3489692670| 3.0|2.869564|285.963|
#|1588088099000|3489692670| 2.0|0.758753|299.578|
#|1588088199000|3489692670| 1.0|3.965424|5.89677|
#+-------------+----------+-----+--------+-------+
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().orderBy("rowNum")
df.withColumn("rowNum", F.monotonically_increasing_id())\
.withColumn("inc_sum", F.sum(F.when((F.col("state")==3) & (F.lag("state").over(w)!=3)\
,F.lit(1)).otherwise(F.lit(0)))\
.over(w))\
.groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
F.first("epoch_ms").alias("start_epoch"),\
F.last("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
F.max("value 2").alias("max_value2")).drop("inc_sum").show()
#+----------+-------------+-------------+----------+----------+
#| ID| start_epoch| end_epoch|max_value1|max_value2|
#+----------+-------------+-------------+----------+----------+
#|3489692692|1585766054000|1587581329000| 1.039999| 359.649|
#|3489692692|1587581339000|1587581329000| 2.799999| 336.299|
#|3489692670|1588088096000|1588088199000| 3.965424| 299.578|
#+----------+-------------+-------------+----------+----------+
推荐阅读
- laravel - Laravel 中的 guzzlephp:发送带有表单数据参数的 post 请求,如邮递员表单数据
- php - 使用 Laravel 保护内部 API 端点
- linux - 在容器内重新启动 Apache2 时出错
- shell - 从 AppleScript 解析文本
- python - 如何将熊猫数据框转换为多索引数据框
- django - 是否可以在包含标签内插入带有变量的字符串?
- javascript - 根据当前视频时间戳显示字幕
- vba - 从 Outlook VBA 下载附件
- javascript - 按下 HTML 按钮不会向 socket.io 发送调用
- c# - 哪种方法最好,制作多个文件还是单个文件?