apache-spark - 如何根据pyspark中的条件组合dataFrame中的行
问题描述
我必须为应用程序处理包含日志(进入和退出)的数据框。数据如下:
USER | DATETIME | IN_OUT
---------------------------------
0002 2018/08/28 12:00 IN
0002 2018/08/28 12:20 OUT
0003 2018/08/28 13:00 IN
0003 2018/08/28 14:20 OUT
0003 2018/08/28 15:00 IN
0003 2018/08/28 16:00 OUT
如何将包含 2 个会话的行组合起来产生
USER | DATETIMEIN | DATETIMEOUT | SESSIONTIME[Minutes]
-------------------------------------------------------
0002 2018/08/28 12:00 2018/08/28 12:20 20
0003 2018/08/28 13:00 2018/08/28 14:30 90
0003 2018/08/28 15:00 2018/08/28 16:00 60
解决方案
如果您可以确保 IN 始终跟在 OUT 事件之后,则可以使用以下代码(我包括了对 IN 和 OUT 的检查,但如果 IN 和 OUT 不交替,它将不起作用)。
from pyspark.sql.window import Window as W
test_df = spark.createDataFrame([
(2,datetime.datetime(2018,8,28,12,00), "IN"),(2,datetime.datetime(2018,8,28,12,20), "OUT"),(3,datetime.datetime(2018,8,28,13,00), "IN"),(3,datetime.datetime(2018,8,28,14,20), "OUT"),(3,datetime.datetime(2018,8,28,15,00), "IN"),(3,datetime.datetime(2018,8,28,16,00), "OUT")
], ("USER", "DATETIME", "IN_OUT")) # creation of Dataframe
w = W.partitionBy("USER").orderBy("DATETIME") #order by datetime and process every user separately
get_in= when((lag("IN_OUT", 1).over(w) == "IN") & (col("IN_OUT")=="OUT"), lag("DATETIME",1).over(w)).otherwise(None) # apply the window and if the previous event was IN preserve the time
test_df.withColumn("DATETIMEIN",get_in.cast("timestamp")).withColumn("DATETIMEOUT",col("DATETIME")).filter((col("DATETIMEIN").isNotNull())).withColumn("SESSIONTIME[Minutes]",(col("DATETIME").cast("long")-col("DATETIMEIN").cast("long"))/60).select("USER","DATETIMEIN", "DATETIMEOUT", "SESSIONTIME[Minutes]").show() #apply the function and compute the difference to previous IN_TIME
结果:
+----+-------------------+-------------------+--------------------+
|USER| DATETIMEIN| DATETIMEOUT|SESSIONTIME[Minutes]|
+----+-------------------+-------------------+--------------------+
| 3|2018-08-28 13:00:00|2018-08-28 14:20:00| 80.0|
| 3|2018-08-28 15:00:00|2018-08-28 16:00:00| 60.0|
| 2|2018-08-28 12:00:00|2018-08-28 12:20:00| 20.0|
+----+-------------------+-------------------+--------------------+
推荐阅读
- java - 查找两个字符串的交集,其中返回两个字符串中出现的字符(与第一个字符串的序列顺序相同)
- android - 迁移到androidX后约束布局视图向右跳转
- node.js - 为什么不能得到查询结果
- mysql - 为什么 max() 只给我该列中的最大值
- php - 如何从 codegnitor 中的数组“data”键获取数据?
- mysql - 我在本地机器上安装了 MySQL Community Edition 8.0.17
- asp.net-core - 无法将 Azure Function App 与数据库连接(使用 .net core 2.1)
- ruby - 如何在 Ruby 中读取证书详细信息?
- github - 如何访问 Github Actions CI/CD 中的服务?
- java - 如何使用 SQLite "IN" 运算符更新多行?