首页 > 解决方案 > pyspark中部分数据帧的聚合

问题描述

是否可以对部分数据帧进行聚合?或者是否可以在给定条件下有效地拆分数据帧?

假设我有一个如下所示的数据框:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692      |        3.0|   0.239999     |   11.2699 |   
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 |  
|1587581329000| 3489692692      |        3.0|   1.039999     |   336.299 |   
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |   

假设一个事件以 state=3 开始并以 state=1 结束,是否有一种有效的方法来按“事件”进行拆分,我希望在这些状态之间包含更小的数据帧,在这种小情况下:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692      |        3.0|   0.239999     |   11.2699 |   
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 | 

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1587581329000| 3489692692      |        3.0|   1.039999     |   336.299 |   
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |  

我的最终目标是拥有另一个基于开始和结束时期对值进行聚合的数据框,例如:

+-------------+---------------+-------------+--------------+-------------+
|  ID         |start epoch    |end_epoch    | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692   |1588119659000  |1587581329000|1.039999      |359.649      |
|3489692692   |1587581329000  |1587581329000|2.799999      |336.299      |

以前,当我不处理太多数据时,我使用 pandas 迭代数据帧并逐行构建新的数据帧,但是,是的,这不是很有效。任何提示我正确方向的提示将不胜感激。

- - - -###更新### - - - - -

我想下面是我正在使用的数据的更好示例:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1585766054000| 3489692692      |        3.0|   0.159999     |   7.58996 |
|1585766055000| 3489692692      |        3.0|   0.239999     |   11.2699 |  
|1585766058000| 3489692692      |        3.0|   0.135489     |   13.8790 |
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 |  
|1587581339000| 3489692692      |        3.0|   1.039999     |   336.299 | 
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |
|1588088096000| 3489692670      |        3.0|   2.869564     |   285.963 |
|1588088099000| 3489692670      |        2.0|   0.758753     |   299.578 |
|1588088199000| 3489692670      |        1.0|   3.965424     |   5.89677 |

需要考虑的事项:

上述示例的结果应类似于:

+-------------+---------------+-------------+--------------+-------------+
|  ID         |start epoch    |end_epoch    | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692   |1585766054000  |1587581329000|1.039999      |359.649      |
|3489692692   |1587581339000  |1587581329000|2.799999      |336.299      |
|3489692670   |1588088096000  |1588088199000|3.965424      |299.578      |

标签: pyspark

解决方案


拆分将是反直觉的,您应该使用pyspark in-built聚合函数(window + groupBy)来表达您的逻辑。只要按照您呈现的方式对数据进行排序,代码就可以正常工作(因为无法确定某些行的顺序,因为对于同一状态(第 2,3 行)您有不同的 epoch_ms。逻辑是在 state 上使用incremental sum条件来找出你的分组start/end。试试 lmk。

df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#|     epoch_ms|        ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1588119659000|3489692692|  3.0|0.239999|11.2699|
#|1587497991000|3489692692|  2.0|0.159999|21.6999|
#|1587864812000|3489692692|  2.0|0.959999|359.649|
#|1587581329000|3489692692|  1.0|1.039999|336.209|
#|1587581329000|3489692692|  3.0|1.039999|336.299|
#|1587581329000|3489692692|  1.0|2.799999|336.209|
#+-------------+----------+-----+--------+-------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("ID").orderBy(F.lit(1))
w2=Window().partitionBy("ID").orderBy("rowNum")

df.withColumn("rowNum", F.row_number().over(w))\
  .withColumn("inc_sum", F.sum(F.when(F.col("state")==3,F.lit(1)).otherwise(F.lit(0))).over(w2))\
  .groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
                          F.max("epoch_ms").alias("start_epoch"),\
                          F.min("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
                          F.max("value 2").alias("max_value2")).drop("inc_sum").show()

#+-------+----------+-------------+-------------+----------+----------+
#|inc_sum|        ID|  start_epoch|    end_epoch|max_value1|max_value2|
#+-------+----------+-------------+-------------+----------+----------+
#|      1|3489692692|1588119659000|1587497991000|  1.039999|   359.649|
#|      2|3489692692|1587581329000|1587581329000|  2.799999|   336.299|
#+-------+----------+-------------+-------------+----------+----------+

UPDATE:

试试这个。我使用lag condition!=3 with state=3条件single out the start of the event,然后使用incremental sum它来获取我们的组。

df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#|     epoch_ms|        ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1585766054000|3489692692|  3.0|0.159999|7.58996|
#|1585766055000|3489692692|  3.0|0.239999|11.2699|
#|1585766058000|3489692692|  3.0|0.135489| 13.879|
#|1587497991000|3489692692|  2.0|0.159999|21.6999|
#|1587864812000|3489692692|  2.0|0.959999|359.649|
#|1587581329000|3489692692|  1.0|1.039999|336.209|
#|1587581339000|3489692692|  3.0|1.039999|336.299|
#|1587581329000|3489692692|  1.0|2.799999|336.209|
#|1588088096000|3489692670|  3.0|2.869564|285.963|
#|1588088099000|3489692670|  2.0|0.758753|299.578|
#|1588088199000|3489692670|  1.0|3.965424|5.89677|
#+-------------+----------+-----+--------+-------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().orderBy("rowNum")

df.withColumn("rowNum", F.monotonically_increasing_id())\
  .withColumn("inc_sum", F.sum(F.when((F.col("state")==3) & (F.lag("state").over(w)!=3)\
                                      ,F.lit(1)).otherwise(F.lit(0)))\
                                       .over(w))\
    .groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
                          F.first("epoch_ms").alias("start_epoch"),\
                          F.last("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
                          F.max("value 2").alias("max_value2")).drop("inc_sum").show()

#+----------+-------------+-------------+----------+----------+
#|        ID|  start_epoch|    end_epoch|max_value1|max_value2|
#+----------+-------------+-------------+----------+----------+
#|3489692692|1585766054000|1587581329000|  1.039999|   359.649|
#|3489692692|1587581339000|1587581329000|  2.799999|   336.299|
#|3489692670|1588088096000|1588088199000|  3.965424|   299.578|
#+----------+-------------+-------------+----------+----------+

推荐阅读