python-3.x - pyspark:查找有条件的行之间的时间差异
问题描述
您好我正在尝试计算某些列中的值之间的差异总和(以毫秒为单位),这些值取决于另一列的值。
更详细地说,我有以下 pyspark 数据框:
d = spark.createDataFrame(
[(133515, "user1", 1562889600046, 'begin'),
(789456, "user2", 1562889600246, 'begin'),
(789456, "user2", 1562889603046, 'end'),
(712346, "user3", 1562889600046, 'begin'),
(789456, "user4", 1562889700046, 'begin'),
(133515, "user1", 1562889640046, 'end'),
(712346, "user3", 1562889602046, 'end'),
(789456, "user4", 1562889800046, 'end'),
(789456, "user4", 1562889850046, 'begin'),
(789456, "user4", 1562889903046, 'end'),
(133515, "user1", 1562889645046, 'begin'),
(133515, "user1", 1562889745046, 'end')
], ("ID", "user", "epoch", "ACTION"))
d.show()
我希望得到以下输出:
+------+-----+-----------+
| ID| user|summed diff|
+------+-----+-----------+
|133515|user1| 50000|
|789456|user2| 2800|
|712346|user3| 2000|
|789456|user4| 153000|
+------+-----+-----------+
列中的每个值summed diff
都是通过将“结束”时期和与该特定用户相关的最后一个“开始”之间的毫秒差求和获得的。
你能指导我如何解决这个问题吗?
如果我想按一天或一天中的小时分组怎么办?
解决方案
试试这个:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
d_final = d.select(F.col("ID"), F.col("user"), F.when(F.col("ACTION") == lit("begin"), -F.col("epoch")).otherwise(F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))
结果:
>>> d_final.show()
+------+-----+-----------+
| ID| user|summed_diff|
+------+-----+-----------+
|789456|user4| 153000|
|712346|user3| 2000|
|133515|user1| 140000|
|789456|user2| 2800|
+------+-----+-----------+
编辑- 使用 udf 看起来更干净
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, udf
action_process = udf(lambda x: -1 if x=="begin" else 1, IntegerType())
d_final = d.select(F.col("ID"), F.col("user"), (action_process(F.col("ACTION")) * F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))
结果:
>>> d_final.show()
+------+-----+-----------+
| ID| user|summed_diff|
+------+-----+-----------+
|789456|user4| 153000|
|712346|user3| 2000|
|133515|user1| 140000|
|789456|user2| 2800|
+------+-----+-----------+
推荐阅读
- javascript - 你如何使 jest.mock(file) 动态化?
- java - 我怎样才能为 Java 的 UVA 在线问题输入字符串?需要的详细信息
- javascript - 如何使用 JavaScript 在不更改锚标记的 Href 中的路径的情况下更改目标域?
- windows - 使用环境变量 / HOME 加载 puttykeyfile
- node.js - 如何转换一组值,使每个值更接近均值,但在 PySpark 中具有相似形状的分布(即减少标准差)
- angular - 如何使用 Angular + Dotnet 核心应用程序防止并发用户登录?
- php - Laravel 使用 File 方法从文件夹中获取文件名
- html - 无法使用 pug js 添加图像
- python - 如何在 python 中使用 loc[]
- windows - Shell - 循环中的回声返回奇怪