python - 将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件
问题描述
我的数据来自 BigQuery,以 CSV 文件的形式导出到 GCS 存储桶,如果文件很大,BigQuery 会自动将数据拆分成几个块。考虑到时间序列,时间序列可能分散在不同的文件中。我有一个自定义函数,我想将其应用于每个TimeseriesID
.
这是数据的一些约束:
- 数据按
TimeseriesID
和排序TimeID
- 每个文件的行数可能会有所不同,但至少有 1 行(这不太可能)
- 的开始
TimeID
并不总是0 - 每个时间序列的长度可能会有所不同,但最多只会分散在 2 个文件中。没有时间序列分散在 3 个不同的文件中。
这是说明问题的初始设置:
# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID
def custom_func(x):
return np.mean(x)
# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})
如果我可以只处理所有文件,这应该是微不足道concat
的,但问题是如果我concat
所有的数据帧都无法放入内存中。
我想要的输出应该与此类似,但没有concat
所有文件。
pd.concat([df1,df2,df3],axis=0).groupby('TimeseriesID').agg({"value":simple_func})
我也知道vaex
,dask
但我想暂时坚持使用简单的熊猫。我也愿意接受涉及修改 BigQuery 以更好地拆分文件的解决方案。
解决方案
op 提出的将 concat 与数百万条记录一起使用的方法对于内存/其他资源来说太过分了。
我已经使用 Google Colab Nootebooks 测试了 OP 代码,这是一种不好的方法
import pandas as pd
import numpy as np
import time
# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID
def custom_func(x):
return np.mean(x)
# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})
start = time.time()
df = pd.concat([df1,df2,df3]).groupby('TimeseriesID').agg({"value":custom_func})
elapsed = (time.time() - start)
print(elapsed)
print(df.head())
输出将是:
0.023952960968017578
value
TimeseriesID A 11.666667
B 16.250000
C 20.000000
D 18.333333
如您所见,“concat”需要时间来处理。由于很少有记录,这没有被察觉。方法应该如下:
- 获取包含您要处理的数据的文件。即:只有可行的列。
- 从已处理的文件键和值创建字典。如有必要,获取必要文件中每个键的值。您可以将结果以 json/csv 格式存储在“结果”目录中:
A.csv 将具有所有关键的“A”值... n.csv 将具有所有关键的“n”值
- 遍历结果目录并开始在字典中构建最终输出。
{'A': [10, 20, 5], 'B': [30, 10, 20, 5], 'C': [30, 10], 'D': [20, 5, 30]}
- 将自定义函数应用于每个键值列表。
{'A':11.666666666666666,'B':16.25,'C':20.0,'D':18.333333333333332}
您可以使用以下代码检查逻辑,我使用 json 来存储数据:
from google.colab import files
import json
import pandas as pd
#initial dataset
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})
#get unique keys and its values
df1.groupby('TimeseriesID')['value'].apply(list).to_json('df1.json')
df2.groupby('TimeseriesID')['value'].apply(list).to_json('df2.json')
df3.groupby('TimeseriesID')['value'].apply(list).to_json('df3.json')
#as this is an example you can download the output as jsons
files.download('df1.json')
files.download('df2.json')
files.download('df3.json')
2021 年6 月 10 日更新 我已经针对 OP 需求调整了代码。这部分创建精炼文件。
from google.colab import files
import json
#you should use your own function to get the data from the file
def retrieve_data(uploaded,file):
return json.loads(uploaded[file].decode('utf-8'))
#you should use your own function to get a list of files to process
def retrieve_files():
return files.upload()
key_list =[]
#call a function that gets a list of files to process
file_to_process = retrieve_files()
#read every raw file:
for file in file_to_process:
file_data = retrieve_data(file_to_process,file)
for key,value in file_data.items():
if key not in key_list:
key_list.append(key)
with open(f'{key}.json','w') as new_key_file:
new_json = json.dumps({key:value})
new_key_file.write(new_json)
else:
with open(f'{key}.json','r+') as key_file:
raw_json = key_file.read()
old_json = json.loads(raw_json)
new_json = json.dumps({key:old_json[key]+value})
key_file.seek(0)
key_file.write(new_json)
for key in key_list:
files.download(f'{key}.json')
print(key_list)
更新 07/10/2021 我更新了代码以避免混淆。这部分加工精制文件。
import time
import numpy as np
#Once we get the refined values we can use it to apply custom functions
def custom_func(x):
return np.mean(x)
#Get key and data content from single json
def get_data(file_data):
content = file_data.popitem()
return content[0],content[1]
#load key list and build our refined dictionary
refined_values = []
#call a function that gets a list of files to process
file_to_process = retrieve_files()
start = time.time()
#read every refined file:
for file in file_to_process:
#read content of file n
file_data = retrieve_data(file_to_process,file)
#parse and apply function per file read
key,data = get_data(file_data)
func_output = custom_func(data)
#start building refined list
refined_values.append([key,func_output])
elapsed = (time.time() - start)
print(elapsed)
df = pd.DataFrame.from_records(refined_values,columns=['TimerSeriesID','value']).sort_values(by=['TimerSeriesID'])
df = df.reset_index(drop=True)
print(df.head())
输出将是:
0.00045609474182128906
TimerSeriesID value
0 A 11.666667
1 B 16.250000
2 C 20.000000
3 D 18.333333
总结:
在处理大型数据集时,您应该始终关注您将要使用的数据并保持最小化。仅使用可行的值。
当操作由基本运算符或 python 本机库执行时,处理时间会更快。
推荐阅读
- javascript - 带有 Laravel 和 Vue 的 SPA 导航栏
- javascript - 为什么在调用后的开玩笑测试中无法读取未定义的属性'then'?
- python - RetrieveAPIView 生成错误“函数”对象没有属性“get_extra_actions”
- html - 悬停在链接上时如何获得 EpicGame 的导航栏转换?
- git - 如何将新代码推送到现有仓库?(作为一个新的分支)
- google-sheets - 如何从有空行的四列范围内的行中获取更高的值
- javafx - 场景生成器中缺少字体
- c - 在 dup2() 和 fork 之后无法从管道中读取。C
- docker - Docker compose:在不删除容器的情况下更改 docker compose 文件
- javascript - 使用 Leaflet 显示地理文件的长度