python - 对时间序列数据按组应用操作
问题描述
我正在从 pandas 切换到 pySpark,因为我正在处理一个大型数据集。我正在研究如下数据框:
+-------------------+---------------+---------------+-------+---------+
| date| ip| dimension|packets|durationM|
+-------------------+---------------+---------------+-------+---------+
|2014-11-04 12:10:00| 192.168.1.1| 1495.91796875| 2365| 6|
|2014-11-04 12:20:00| 192.168.1.2| 384.0859375| 660| 2|
|2014-11-06 13:21:00| 192.168.1.1| 1814.08203125| 2562| 10|
|2014-11-03 15:57:00| 192.168.1.3| 111.0908203125| 192| 1|
|2014-11-05 23:35:00| 192.168.1.1| 1014.66796875| 1968| 6|
|2014-11-06 06:28:00| 192.168.1.8| 395.654296875| 840| 50|
|2014-11-06 05:20:00| 192.168.1.9| 992.8251953125| 1837| 8|
|2014-11-06 04:56:00| 192.168.1.11| 215.8408203125| 636| 1|
|2014-11-06 05:34:00| 192.168.1.15|1544.7392578125| 2605| 0|
|2014-11-07 17:55:00| 192.168.1.1| 189.4345703125| 376| 10|
|2014-11-03 09:54:00| 192.168.1.102| 211.435546875| 587| 3|
|2014-11-07 14:55:00| 192.168.1.2| 2295.666015625| 3900| 4|
|2014-11-07 15:01:00| 192.168.1.68| 172.8955078125| 414| 21|
|2014-11-08 13:45:00| 192.168.1.68| 101.234375| 209| 4|
|2014-11-08 18:36:00| 192.168.1.155| 262.3056640625| 535| 5|
|2014-11-03 21:32:00| 192.168.1.192| 1363.724609375| 2175| 6|
|2014-11-03 06:50:00| 192.168.1.67| 338.142578125| 650| 11|
|2014-11-05 19:56:00| 192.168.1.144| 628.0888671875| 863| 24|
|2014-11-06 16:36:00| 192.168.1.89|1292.9912109375| 2237| 2|
|2014-11-05 19:43:00| 192.168.1.79| 2659.20703125| 3348| 32|
+-------------------+---------------+---------------+-------+---------+
我必须为每个 ip 计算会话:
在为每个 ip 填写缺失日期后,我必须将结束日期替换为给定持续时间的最后一个结束日期。换句话说,如果我有:
+-------------------+--------------+--------------+-------+---------+--------------------+
| date| ip| dimension|packets|durationM| endDate|
+-------------------+--------------+--------------+-------+---------+--------------------+
|2014-11-04 12:10:00| 192.168.1.1| 1495.91796875| 2365| 6| 2014-11-04 12:16:00|
|2014-11-04 12:13:00| 192.168.1.1| 1495.91796875| 2365| 9| 2014-11-04 12:22:00|
结果应该是:
+-------------------+--------------+--------------+-------+---------+--------------------+
| date| ip| dimension|packets|durationM| endDate|
+-------------------+--------------+--------------+-------+---------+--------------------+
|2014-11-04 12:10:00| 192.168.1.1| 1495.91796875| 2365| 6| 2014-11-04 12:22:00|
|2014-11-04 12:11:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:11:00|
|2014-11-04 12:12:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:12:00|
|2014-11-04 12:13:00| 192.168.1.1| 1495.91796875| 2365| 9| 2014-11-04 12:13:00|
+-------------------+--------------+--------------+-------+---------+--------------------+
我已经在 pandas 中做了类似的事情,但只针对一个 ip,而且我不知道如何在 PySpark 中使用多个 ip。这就是我在熊猫中的做法:
idx = pd.date_range(start='11/4/2014', end='11/5/2014', freq = '1min', closed='left')
df = df.reindex(idx, fill_value=0)
df.drop(columns = 'duration', inplace=True)
df.reset_index(inplace=True)
df.rename(columns={'index':'date'}, inplace=True)
listEnd=df.endDate.tolist()
for i, v in enumerate(df.date):
if (df.date[i] != listEnd[i]):
for n in range(i+1, len(df.date)):
if(listEnd[i]==df.date[n]):
if(listEnd[i] == listEnd[n]):
break
if(listEnd[i]>=listEnd[n]):
listEnd[n] = df.date[n]
else:
listEnd[i] = listEnd[n]
listEnd[n] = df.date[n]
df['endDate'] = listEnd
df['sessionDuration'] = ((df['endDate']-df['date']).dt.seconds)//60
奖励部分 - 如果两个会话分开超过 10 分钟,则不同的会话:
+-------------------+--------------+--------------+-------+---------+--------------------+
| date| ip| dimension|packets|durationM| endDate|
+-------------------+--------------+--------------+-------+---------+--------------------+
|2014-11-04 12:10:00| 192.168.1.1| 1495.91796875| 2365| 6| 2014-11-04 12:23:00|
|2014-11-04 12:11:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:11:00|
|2014-11-04 12:12:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:12:00|
|2014-11-04 12:13:00| 192.168.1.1| 1495.91796875| 2365| 9| 2014-11-04 12:13:00|
|2014-11-04 12:14:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:14:00|
|2014-11-04 12:15:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:15:00|
|2014-11-04 12:16:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:16:00|
|2014-11-04 12:17:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:17:00|
|2014-11-04 12:18:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:18:00|
|2014-11-04 12:19:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:19:00|
|2014-11-04 12:20:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:20:00|
|2014-11-04 12:21:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:21:00|
|2014-11-04 12:22:00| 192.168.1.1| 1495.91796875| 2365| 0| 2014-11-04 12:22:00|
|2014-11-04 12:23:00| 192.168.1.1| 1495.91796875| 2365| 1| 2014-11-04 12:23:00|
+-------------------+--------------+--------------+-------+---------+--------------------+
在这种情况下,会话只有一个(从 12:10 到 12:23),因为第二个连接仅比第一个连接晚一分钟。如果两个连接之间的时间超过 10 分钟,它们将是两个不同的会话。
解决方案
推荐阅读
- android - 更改状态时如何在底部表中添加动画
- docusaurus - 在 docusaurus 中添加嵌入模式
- json - 如何管理地图
> 在 Dart/Flutter 中? - node.js - 如何在 Express/Mongo 中获取特定用户的所有帖子列表?
- recursion - RecursionError:Odoo 13 中超出了最大递归深度
- google-app-engine - 为什么我不能覆盖我的 Google Cloud Build 的超时?
- linux - 在 bash 中运行 while 循环时二进制运算符预期错误
- reactjs - 无法在未安装的组件上执行 React 状态更新并且状态没有得到更新
- javascript - Bootstrap 悬停效果在 IE 11 中未按预期工作
- java - 获取和释放 java 监视器锁(同步块、重入锁等)是否需要上下文切换到内核空间?