首页 > 解决方案 > 计算pyspark中每组成对连续行之间的时间差

问题描述

我想计算每个用户每个 SeqID 花费的时间。我有一个这样的数据框。但是,时间会在每个用户的两个操作之间分配,Action_A and Action_B. 每个用户每个 seqID 的总时间将是所有此类对的总和

对于第一个用户,它是5 + 3 [(2019-12-10 10:00:00 - 2019-12-10 10:05:00) + (2019-12-10 10:20:00 - 2019-12-10 10:23:00)]

所以第一个用户理想地花费8 mins了 SeqID 1 (而不是23 mins)。

同样,用户 2 已花费1 + 5 = 6 mins

如何使用 pyspark 进行计算?

data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")), 
        (("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
        (("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]
df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])
df.show()

+---+-----+-------------------+--------+
| ID|SeqID|          Timestamp|  Action|
+---+-----+-------------------+--------+
|ID1|   15|2019-12-10 10:00:00|Action_A|
|ID1|   15|2019-12-10 10:05:00|Action_B|
|ID1|   15|2019-12-10 10:20:00|Action_A|
|ID1|   15|2019-12-10 10:23:00|Action_B|
|ID2|   23|2019-12-10 11:10:00|Action_A|
|ID2|   23|2019-12-10 11:11:00|Action_B|
|ID2|   23|2019-12-10 11:30:00|Action_A|
|ID2|   23|2019-12-10 11:35:00|Action_B|
+---+-----+-------------------+--------+

一旦我有了每一对的数据,我就可以在整个组中求和(ID,SeqID)

预期输出(也可能是几秒钟)

+---+-----+--------+
| ID|SeqID|Dur_Mins|
+---+-----+--------+
|ID1|   15|       8|
|ID2|   23|       6|
+---+-----+--------+

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


这是使用高阶函数(Spark >=2.4)的可能解决方案:

transform_expr = "transform(ts_array, (x,i) -> (unix_timestamp(ts_array[i+1]) - unix_timestamp(x))/60 * ((i+1)%2))"

df.groupBy("ID", "SeqID").agg(array_sort(collect_list(col("Timestamp"))).alias("ts_array")) \
    .withColumn("transformed_ts_array", expr(transform_expr)) \
    .withColumn("Dur_Mins", expr("aggregate(transformed_ts_array, 0D, (acc, x) -> acc + coalesce(x, 0D))")) \
    .drop("transformed_ts_array", "ts_array") \
    .show(truncate=False)

脚步:

  1. 将所有时间戳收集到每个组的数组中IDSeqID并按升序对它们进行排序
  2. 使用 lambda 函数对数组应用变换(x, i) => Doublex实际元素i及其索引在哪里。对于数组中的每个时间戳,我们计算与下一个时间戳的差异。我们乘以(i+1)%2只有 diff 为 2 per 2 对(第一个与第二个,第三个与第四个,...),因为总是有 2 个动作。
  3. 最后,我们聚合转换的结果数组以对所有元素求和。

输出:

+---+-----+--------+
|ID |SeqID|Dur_Mins|
+---+-----+--------+
|ID1|15   |8.0     |
|ID2|23   |6.0     |
+---+-----+--------+

推荐阅读