python - 在 agg 中使用 pd.Series.mode
问题描述
我尝试在 agg 中使用 pd.Series.mode 以获得最常用的 source_port 或destination_port。那应该是一个值
我得到的错误是ValueError: Function does not reduce
flow = {'date': ['2020-11-13 13:57:51','2020-11-13 13:57:51','2020-11-13 13:57:52','2020-11-13 13:59:53','2020-11-13 13:59:54','2020-11-13 14:01:20'],
'source_ip': ['192.168.1.1','192.168.1.2','10.0.0.1','192.168.1.1','192.168.1.1', '10.0.0.1'],
'destination_ip': ['10.0.0.1', '10.0.0.1', '192.168.1.1', '192.168.1.2', '192.168.1.2', '10.0.0.2'],
'source_bytes':[5,1,2,3,3,1],
'source_port': [1000,80, 22, 7500, 443, 7000],
'destination_bytes': [1,2,3,4,5,2],
'destination_port': [80,22,443,80,80,443]
}
df = pd.DataFrame(flow, columns=['date', 'source_ip', 'destination_ip', 'source_bytes', 'source_port', 'destination_bytes', 'destination_port'])
df['date'] = pd.to_datetime(df['date'])
list_of_ports = [80, 22, 443]
dynamic_columns = []
for port in list_of_ports:
df[f'source_byte_port_{port}'] = np.where(df.source_port == port, df.source_bytes, 0)
df[f'destination_byte_port_{port}'] = np.where(df.destination_port == port, df.destination_bytes, 0)
dynamic_columns.append(f'source_byte_port_{port}')
dynamic_columns.append(f'destination_byte_port_{port}')
aggregator = {'source_bytes': ['sum','min','max','mean','count'],
'destination_bytes': ['sum','min','max','mean','count'],
'source_port': pd.Series.mode,
'destination_port': pd.Series.mode
}
for i in dynamic_columns:
aggregator[i] = ['sum']
idx = pd.period_range(min(df.date), max(df.date))
frequency = '1min'
df2 = (df.melt(['date', 'source_bytes', 'destination_bytes','source_port', 'destination_port'] + dynamic_columns, value_name='ip')
.groupby(['ip', pd.Grouper(key='date', freq=frequency)])[['source_bytes','destination_bytes', 'source_port','destination_port'] + dynamic_columns]
.agg(aggregator)
.reset_index()
)
解决方案
如果你用pd.Series.mode
lambda 函数替换它就可以工作lambda x:x.value_counts().index[0]
:
import pandas as pd
flow = {'date': ['2020-11-13 13:57:51','2020-11-13 13:57:51','2020-11-13 13:57:52','2020-11-13 13:59:53','2020-11-13 13:59:54','2020-11-13 14:01:20'],
'source_ip': ['192.168.1.1','192.168.1.2','10.0.0.1','192.168.1.1','192.168.1.1', '10.0.0.1'],
'destination_ip': ['10.0.0.1', '10.0.0.1', '192.168.1.1', '192.168.1.2', '192.168.1.2', '10.0.0.2'],
'source_bytes':[5,1,2,3,3,1],
'source_port': [1000,80, 22, 7500, 443, 7000],
'destination_bytes': [1,2,3,4,5,2],
'destination_port': [80,22,443,80,80,443]
}
df = pd.DataFrame(flow, columns=['date', 'source_ip', 'destination_ip', 'source_bytes', 'source_port', 'destination_bytes', 'destination_port'])
df['date'] = pd.to_datetime(df['date'])
list_of_ports = [80, 22, 443]
dynamic_columns = []
for port in list_of_ports:
df[f'source_byte_port_{port}'] = np.where(df.source_port == port, df.source_bytes, 0)
df[f'destination_byte_port_{port}'] = np.where(df.destination_port == port, df.destination_bytes, 0)
dynamic_columns.append(f'source_byte_port_{port}')
dynamic_columns.append(f'destination_byte_port_{port}')
my_mode = lambda x:x.value_counts().index[0]
aggregator = {'source_bytes': ['sum','min','max','mean','count'],
'destination_bytes': ['sum','min','max','mean','count'],
'source_port': [('mode', my_mode)],
'destination_port': [('mode', my_mode)]
}
for i in dynamic_columns:
aggregator[i] = ['sum']
idx = pd.period_range(min(df.date), max(df.date))
frequency = '1min'
df2 = (df.melt(['date', 'source_bytes', 'destination_bytes','source_port', 'destination_port'] + dynamic_columns, value_name='ip')
.groupby(['ip', pd.Grouper(key='date', freq=frequency)])[['source_bytes','destination_bytes', 'source_port','destination_port'] + dynamic_columns]
.agg(aggregator)
.reset_index()
)
print(df2)
推荐阅读
- python-3.x - 文本建议 API 中以毫秒为单位的烧瓶限制器作为用户类型
- kotlin - 在协程之后从函数返回一个值
- amazon-web-services - 为什么即使使用 DependsOn,CloudFormation API 网关也无法说明“REST API 不包含任何方法”?
- python - 根据特定条件应用拆分
- php - 根据缺货状态和一天中的时间在购物车页面上显示 div
- sql - 检查一个表中的多列与另一表中的一列
- vue.js - 如何在 Nuxt.js 的 v-for 循环中使用 SVG 生成?
- sql - SQL Server - 舍入问题
- bash - 用于检查 Postgres 数据库中的 JDBC URL 的 Bash 命令
- c++ - 从最高有效位开始的第 N 个数字