首页 > 解决方案 > 在 agg 中使用 pd.Series.mode

问题描述

我尝试在 agg 中使用 pd.Series.mode 以获得最常用的 source_port 或destination_port。那应该是一个值

我得到的错误是ValueError: Function does not reduce

flow = {'date': ['2020-11-13 13:57:51','2020-11-13 13:57:51','2020-11-13 13:57:52','2020-11-13 13:59:53','2020-11-13 13:59:54','2020-11-13 14:01:20'],
        'source_ip': ['192.168.1.1','192.168.1.2','10.0.0.1','192.168.1.1','192.168.1.1', '10.0.0.1'],
        'destination_ip': ['10.0.0.1', '10.0.0.1', '192.168.1.1', '192.168.1.2', '192.168.1.2', '10.0.0.2'],
        'source_bytes':[5,1,2,3,3,1],
        'source_port': [1000,80, 22, 7500, 443, 7000],
        'destination_bytes': [1,2,3,4,5,2],
        'destination_port': [80,22,443,80,80,443]
        }

df = pd.DataFrame(flow, columns=['date', 'source_ip', 'destination_ip', 'source_bytes', 'source_port', 'destination_bytes', 'destination_port'])
df['date'] = pd.to_datetime(df['date'])
list_of_ports = [80, 22, 443]
dynamic_columns = []

for port in list_of_ports:
    df[f'source_byte_port_{port}'] = np.where(df.source_port == port, df.source_bytes, 0)
    df[f'destination_byte_port_{port}'] = np.where(df.destination_port == port, df.destination_bytes, 0)
    dynamic_columns.append(f'source_byte_port_{port}')
    dynamic_columns.append(f'destination_byte_port_{port}')

aggregator = {'source_bytes': ['sum','min','max','mean','count'],
             'destination_bytes': ['sum','min','max','mean','count'],
             'source_port': pd.Series.mode,
             'destination_port': pd.Series.mode
            }

for i in dynamic_columns:
    aggregator[i] = ['sum']

idx = pd.period_range(min(df.date), max(df.date))
frequency = '1min'

df2 = (df.melt(['date', 'source_bytes', 'destination_bytes','source_port', 'destination_port'] + dynamic_columns, value_name='ip')
        .groupby(['ip', pd.Grouper(key='date', freq=frequency)])[['source_bytes','destination_bytes', 'source_port','destination_port'] + dynamic_columns]
        .agg(aggregator)
        .reset_index()
        )

标签: pythonpandas

解决方案


如果你用pd.Series.modelambda 函数替换它就可以工作lambda x:x.value_counts().index[0]

import pandas as pd

flow = {'date': ['2020-11-13 13:57:51','2020-11-13 13:57:51','2020-11-13 13:57:52','2020-11-13 13:59:53','2020-11-13 13:59:54','2020-11-13 14:01:20'],
        'source_ip': ['192.168.1.1','192.168.1.2','10.0.0.1','192.168.1.1','192.168.1.1', '10.0.0.1'],
        'destination_ip': ['10.0.0.1', '10.0.0.1', '192.168.1.1', '192.168.1.2', '192.168.1.2', '10.0.0.2'],
        'source_bytes':[5,1,2,3,3,1],
        'source_port': [1000,80, 22, 7500, 443, 7000],
        'destination_bytes': [1,2,3,4,5,2],
        'destination_port': [80,22,443,80,80,443]
        }

df = pd.DataFrame(flow, columns=['date', 'source_ip', 'destination_ip', 'source_bytes', 'source_port', 'destination_bytes', 'destination_port'])
df['date'] = pd.to_datetime(df['date'])
list_of_ports = [80, 22, 443]
dynamic_columns = []

for port in list_of_ports:
    df[f'source_byte_port_{port}'] = np.where(df.source_port == port, df.source_bytes, 0)
    df[f'destination_byte_port_{port}'] = np.where(df.destination_port == port, df.destination_bytes, 0)
    dynamic_columns.append(f'source_byte_port_{port}')
    dynamic_columns.append(f'destination_byte_port_{port}')

my_mode = lambda x:x.value_counts().index[0]

aggregator = {'source_bytes': ['sum','min','max','mean','count'],
             'destination_bytes': ['sum','min','max','mean','count'],
             'source_port': [('mode', my_mode)],
             'destination_port': [('mode', my_mode)]
            }

for i in dynamic_columns:
    aggregator[i] = ['sum']

idx = pd.period_range(min(df.date), max(df.date))
frequency = '1min'

df2 = (df.melt(['date', 'source_bytes', 'destination_bytes','source_port', 'destination_port'] + dynamic_columns, value_name='ip')
        .groupby(['ip', pd.Grouper(key='date', freq=frequency)])[['source_bytes','destination_bytes', 'source_port','destination_port'] + dynamic_columns]
        .agg(aggregator)
        .reset_index()
        )

print(df2)

推荐阅读