apache-spark - 计算pyspark中每组成对连续行之间的时间差
问题描述
我想计算每个用户每个 SeqID 花费的时间。我有一个这样的数据框。但是,时间会在每个用户的两个操作之间分配,Action_A and Action_B.
每个用户每个 seqID 的总时间将是所有此类对的总和
对于第一个用户,它是5 + 3 [(2019-12-10 10:00:00 - 2019-12-10 10:05:00) + (2019-12-10 10:20:00 - 2019-12-10 10:23:00)]
所以第一个用户理想地花费8 mins
了 SeqID 1 (而不是23 mins
)。
同样,用户 2 已花费1 + 5 = 6 mins
如何使用 pyspark 进行计算?
data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")),
(("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
(("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
(("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
(("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
(("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
(("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
(("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]
df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])
df.show()
+---+-----+-------------------+--------+
| ID|SeqID| Timestamp| Action|
+---+-----+-------------------+--------+
|ID1| 15|2019-12-10 10:00:00|Action_A|
|ID1| 15|2019-12-10 10:05:00|Action_B|
|ID1| 15|2019-12-10 10:20:00|Action_A|
|ID1| 15|2019-12-10 10:23:00|Action_B|
|ID2| 23|2019-12-10 11:10:00|Action_A|
|ID2| 23|2019-12-10 11:11:00|Action_B|
|ID2| 23|2019-12-10 11:30:00|Action_A|
|ID2| 23|2019-12-10 11:35:00|Action_B|
+---+-----+-------------------+--------+
一旦我有了每一对的数据,我就可以在整个组中求和(ID,SeqID)
预期输出(也可能是几秒钟)
+---+-----+--------+
| ID|SeqID|Dur_Mins|
+---+-----+--------+
|ID1| 15| 8|
|ID2| 23| 6|
+---+-----+--------+
解决方案
这是使用高阶函数(Spark >=2.4)的可能解决方案:
transform_expr = "transform(ts_array, (x,i) -> (unix_timestamp(ts_array[i+1]) - unix_timestamp(x))/60 * ((i+1)%2))"
df.groupBy("ID", "SeqID").agg(array_sort(collect_list(col("Timestamp"))).alias("ts_array")) \
.withColumn("transformed_ts_array", expr(transform_expr)) \
.withColumn("Dur_Mins", expr("aggregate(transformed_ts_array, 0D, (acc, x) -> acc + coalesce(x, 0D))")) \
.drop("transformed_ts_array", "ts_array") \
.show(truncate=False)
脚步:
- 将所有时间戳收集到每个组的数组中
ID
,SeqID
并按升序对它们进行排序 - 使用 lambda 函数对数组应用变换
(x, i) => Double
。x
实际元素i
及其索引在哪里。对于数组中的每个时间戳,我们计算与下一个时间戳的差异。我们乘以(i+1)%2
只有 diff 为 2 per 2 对(第一个与第二个,第三个与第四个,...),因为总是有 2 个动作。 - 最后,我们聚合转换的结果数组以对所有元素求和。
输出:
+---+-----+--------+
|ID |SeqID|Dur_Mins|
+---+-----+--------+
|ID1|15 |8.0 |
|ID2|23 |6.0 |
+---+-----+--------+
推荐阅读
- http - 如何使这个 HTTP 请求示例工作?
- delphi - 当 TGridPanel 的单元格具有较大的 BorderWidth 时,如何从它获取真正的 ClientRect?
- java - Grails 4.X 可以运行哪些 Java 版本?
- r - ggplot中多列的饼图
- graphql - graphQL + gatsby:查询 Image 或 mp4 的字段
- java - 蓝牙未与其他设备自动配对
- c# - Web 应用程序用户身份验证的最安全方式
- javascript - 提交谷歌表单时电子邮件发送器出现问题
- c++ - 多线程 gtkmm 应用程序的最简单示例
- regex - 为什么我的正则表达式在 PowerShell 中不起作用?