首页 > 解决方案 > pyspark:查找有条件的行之间的时间差异

问题描述

您好我正在尝试计算某些列中的值之间的差异总和(以毫秒为单位),这些值取决于另一列的值。

更详细地说,我有以下 pyspark 数据框:

d = spark.createDataFrame(
    [(133515, "user1", 1562889600046, 'begin'), 
     (789456, "user2", 1562889600246, 'begin'),
     (789456, "user2", 1562889603046, 'end'),
     (712346, "user3", 1562889600046, 'begin'),
     (789456, "user4", 1562889700046, 'begin'),
     (133515, "user1", 1562889640046, 'end'),
     (712346, "user3", 1562889602046, 'end'),
     (789456, "user4", 1562889800046, 'end'),
     (789456, "user4", 1562889850046, 'begin'),
     (789456, "user4", 1562889903046, 'end'),
     (133515, "user1", 1562889645046, 'begin'),
     (133515, "user1", 1562889745046, 'end')

    ], ("ID", "user", "epoch", "ACTION"))
d.show()

我希望得到以下输出:

+------+-----+-----------+
|    ID| user|summed diff|
+------+-----+-----------+
|133515|user1|      50000|
|789456|user2|       2800|
|712346|user3|       2000|
|789456|user4|     153000|
+------+-----+-----------+

列中的每个值summed diff都是通过将“结束”时期和与该特定用户相关的最后一个“开始”之间的毫秒差求和获得的。

你能指导我如何解决这个问题吗?

如果我想按一天或一天​​中的小时分组怎么办?

标签: python-3.xpysparkpyspark-sql

解决方案


试试这个:

from pyspark.sql import functions as F
from pyspark.sql.functions import lit

d_final = d.select(F.col("ID"), F.col("user"), F.when(F.col("ACTION") == lit("begin"), -F.col("epoch")).otherwise(F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))

结果:

>>> d_final.show()
+------+-----+-----------+
|    ID| user|summed_diff|
+------+-----+-----------+
|789456|user4|     153000|
|712346|user3|       2000|
|133515|user1|     140000|
|789456|user2|       2800|
+------+-----+-----------+


编辑- 使用 udf 看起来更干净

from pyspark.sql import functions as F
from pyspark.sql.functions import lit, udf

action_process = udf(lambda x: -1 if x=="begin" else 1, IntegerType())

d_final = d.select(F.col("ID"), F.col("user"), (action_process(F.col("ACTION")) * F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))

结果:

>>> d_final.show()
+------+-----+-----------+
|    ID| user|summed_diff|
+------+-----+-----------+
|789456|user4|     153000|
|712346|user3|       2000|
|133515|user1|     140000|
|789456|user2|       2800|
+------+-----+-----------+


推荐阅读