首页 > 解决方案 > 对时间序列数据按组应用操作

问题描述

我正在从 pandas 切换到 pySpark,因为我正在处理一个大型数据集。我正在研究如下数据框:

+-------------------+---------------+---------------+-------+---------+
|               date|             ip|      dimension|packets|durationM|
+-------------------+---------------+---------------+-------+---------+
|2014-11-04 12:10:00|    192.168.1.1|  1495.91796875|   2365|        6|
|2014-11-04 12:20:00|    192.168.1.2|    384.0859375|    660|        2|
|2014-11-06 13:21:00|    192.168.1.1|  1814.08203125|   2562|       10|
|2014-11-03 15:57:00|    192.168.1.3| 111.0908203125|    192|        1|
|2014-11-05 23:35:00|    192.168.1.1|  1014.66796875|   1968|        6|
|2014-11-06 06:28:00|    192.168.1.8|  395.654296875|    840|       50|
|2014-11-06 05:20:00|    192.168.1.9| 992.8251953125|   1837|        8|
|2014-11-06 04:56:00|   192.168.1.11| 215.8408203125|    636|        1|
|2014-11-06 05:34:00|   192.168.1.15|1544.7392578125|   2605|        0|
|2014-11-07 17:55:00|    192.168.1.1| 189.4345703125|    376|       10|
|2014-11-03 09:54:00|  192.168.1.102|  211.435546875|    587|        3|
|2014-11-07 14:55:00|    192.168.1.2| 2295.666015625|   3900|        4|
|2014-11-07 15:01:00|   192.168.1.68| 172.8955078125|    414|       21|
|2014-11-08 13:45:00|   192.168.1.68|     101.234375|    209|        4|
|2014-11-08 18:36:00|  192.168.1.155| 262.3056640625|    535|        5|
|2014-11-03 21:32:00|  192.168.1.192| 1363.724609375|   2175|        6|
|2014-11-03 06:50:00|   192.168.1.67|  338.142578125|    650|       11|
|2014-11-05 19:56:00|  192.168.1.144| 628.0888671875|    863|       24|
|2014-11-06 16:36:00|   192.168.1.89|1292.9912109375|   2237|        2|
|2014-11-05 19:43:00|   192.168.1.79|  2659.20703125|   3348|       32|
+-------------------+---------------+---------------+-------+---------+

我必须为每个 ip 计算会话:
在为每个 ip 填写缺失日期后,我必须将结束日期替换为给定持续时间的最后一个结束日期。换句话说,如果我有:

+-------------------+--------------+--------------+-------+---------+--------------------+
|               date|            ip|     dimension|packets|durationM|             endDate|
+-------------------+--------------+--------------+-------+---------+--------------------+
|2014-11-04 12:10:00|   192.168.1.1| 1495.91796875|   2365|        6| 2014-11-04 12:16:00|
|2014-11-04 12:13:00|   192.168.1.1| 1495.91796875|   2365|        9| 2014-11-04 12:22:00|

结果应该是:

+-------------------+--------------+--------------+-------+---------+--------------------+
|               date|            ip|     dimension|packets|durationM|             endDate|
+-------------------+--------------+--------------+-------+---------+--------------------+
|2014-11-04 12:10:00|   192.168.1.1| 1495.91796875|   2365|        6| 2014-11-04 12:22:00|
|2014-11-04 12:11:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:11:00|
|2014-11-04 12:12:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:12:00|
|2014-11-04 12:13:00|   192.168.1.1| 1495.91796875|   2365|        9| 2014-11-04 12:13:00|
+-------------------+--------------+--------------+-------+---------+--------------------+  

我已经在 pandas 中做了类似的事情,但只针对一个 ip,而且我不知道如何在 PySpark 中使用多个 ip。这就是我在熊猫中的做法:

idx = pd.date_range(start='11/4/2014', end='11/5/2014', freq = '1min', closed='left')

df = df.reindex(idx, fill_value=0)

df.drop(columns = 'duration', inplace=True)
df.reset_index(inplace=True)
df.rename(columns={'index':'date'}, inplace=True)

listEnd=df.endDate.tolist()
    
for i, v in enumerate(df.date):
    if (df.date[i] != listEnd[i]):
        for n in range(i+1, len(df.date)):
            if(listEnd[i]==df.date[n]):
                if(listEnd[i] == listEnd[n]):
                    break
            if(listEnd[i]>=listEnd[n]):
                listEnd[n] = df.date[n]
            else:
                listEnd[i] = listEnd[n]
                listEnd[n] = df.date[n]

df['endDate'] = listEnd

df['sessionDuration'] = ((df['endDate']-df['date']).dt.seconds)//60

奖励部分 - 如果两个会话分开超过 10 分钟,则不同的会话:

+-------------------+--------------+--------------+-------+---------+--------------------+
|               date|            ip|     dimension|packets|durationM|             endDate|
+-------------------+--------------+--------------+-------+---------+--------------------+
|2014-11-04 12:10:00|   192.168.1.1| 1495.91796875|   2365|        6| 2014-11-04 12:23:00|
|2014-11-04 12:11:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:11:00|
|2014-11-04 12:12:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:12:00|
|2014-11-04 12:13:00|   192.168.1.1| 1495.91796875|   2365|        9| 2014-11-04 12:13:00|
|2014-11-04 12:14:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:14:00|
|2014-11-04 12:15:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:15:00|
|2014-11-04 12:16:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:16:00|
|2014-11-04 12:17:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:17:00|
|2014-11-04 12:18:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:18:00|
|2014-11-04 12:19:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:19:00|
|2014-11-04 12:20:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:20:00|
|2014-11-04 12:21:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:21:00|
|2014-11-04 12:22:00|   192.168.1.1| 1495.91796875|   2365|        0| 2014-11-04 12:22:00|
|2014-11-04 12:23:00|   192.168.1.1| 1495.91796875|   2365|        1| 2014-11-04 12:23:00|
+-------------------+--------------+--------------+-------+---------+--------------------+

在这种情况下,会话只有一个(从 12:10 到 12:23),因为第二个连接仅比第一个连接晚一分钟。如果两个连接之间的时间超过 10 分钟,它们将是两个不同的会话。

标签: pythonpyspark

解决方案


推荐阅读