首页 > 解决方案 > 将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件

问题描述

我的数据来自 BigQuery,以 CSV 文件的形式导出到 GCS 存储桶,如果文件很大,BigQuery 会自动将数据拆分成几个块。考虑到时间序列,时间序列可能分散在不同的文件中。我有一个自定义函数,我想将其应用于每个TimeseriesID.

这是数据的一些约束:

这是说明问题的初始设置:

# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID
def custom_func(x):
    return np.mean(x) 

# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})

如果我可以只处理所有文件,这应该是微不足道concat的,但问题是如果我concat所有的数据帧都无法放入内存中。

我想要的输出应该与此类似,但没有concat所有文件。

pd.concat([df1,df2,df3],axis=0).groupby('TimeseriesID').agg({"value":simple_func}) 

我也知道vaexdask但我想暂时坚持使用简单的熊猫。我也愿意接受涉及修改 BigQuery 以更好地拆分文件的解决方案。

标签: pythonpandasgoogle-bigquery

解决方案


op 提出的将 concat 与数百万条记录一起使用的方法对于内存/其他资源来说太过分了。

我已经使用 Google Colab Nootebooks 测试了 OP 代码,这是一种不好的方法

import pandas as pd
import numpy as np
import time

# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID

def  custom_func(x):
    return np.mean(x)

# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})

start = time.time()
df = pd.concat([df1,df2,df3]).groupby('TimeseriesID').agg({"value":custom_func})
elapsed = (time.time() - start)

print(elapsed)
print(df.head())

输出将是:

0.023952960968017578 
                value 
TimeseriesID A 11.666667 
             B 16.250000 
             C 20.000000 
             D 18.333333

如您所见,“concat”需要时间来处理。由于很少有记录,这没有被察觉。方法应该如下:

  1. 获取包含您要处理的数据的文件。即:只有可行的列。
  2. 从已处理的文件键和值创建字典。如有必要,获取必要文件中每个键的值。您可以将结果以 json/csv 格式存储在“结果”目录中:

A.csv 将具有所有关键的“A”值... n.csv 将具有所有关键的“n”值

  1. 遍历结果目录并开始在字典中构建最终输出。

{'A': [10, 20, 5], 'B': [30, 10, 20, 5], 'C': [30, 10], 'D': [20, 5, 30]}

  1. 将自定义函数应用于每个键值列表。

{'A':11.666666666666666,'B':16.25,'C':20.0,'D':18.333333333333332}

您可以使用以下代码检查逻辑,我使用 json 来存储数据:

from google.colab import files
import json
import pandas as pd

#initial dataset
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})

#get unique keys and its values
df1.groupby('TimeseriesID')['value'].apply(list).to_json('df1.json')
df2.groupby('TimeseriesID')['value'].apply(list).to_json('df2.json')
df3.groupby('TimeseriesID')['value'].apply(list).to_json('df3.json')

#as this is an example you can download the output as jsons
files.download('df1.json')
files.download('df2.json')
files.download('df3.json')

2021 年6 月 10 日更新 我已经针对 OP 需求调整了代码。这部分创建精炼文件。

from google.colab import files
import json

#you should use your own function to get the data from the file
def retrieve_data(uploaded,file):
  return json.loads(uploaded[file].decode('utf-8'))

#you should use your own function to get a list of files to process
def retrieve_files():
  return files.upload()

key_list =[]
#call a function that gets a list of files to process
file_to_process = retrieve_files()

#read every raw file:
for file in file_to_process: 
  file_data = retrieve_data(file_to_process,file)

  for key,value in file_data.items(): 
    if key not in key_list: 
      key_list.append(key)
      with open(f'{key}.json','w') as new_key_file:
        new_json = json.dumps({key:value})
        new_key_file.write(new_json)

    else:
      with open(f'{key}.json','r+') as key_file:
        raw_json = key_file.read()
        old_json = json.loads(raw_json)
        new_json = json.dumps({key:old_json[key]+value})

        key_file.seek(0)
        key_file.write(new_json)

for key in key_list:
  files.download(f'{key}.json')

print(key_list)

更新 07/10/2021 我更新了代码以避免混淆。这部分加工精制文件。

import time
import numpy as np

#Once we get the refined values we can use it to apply custom functions
def custom_func(x):
    return np.mean(x) 

#Get key and data content from single json
def get_data(file_data):
    content = file_data.popitem()
    return content[0],content[1]

#load key list and build our refined dictionary
refined_values = []

#call a function that gets a list of files to process
file_to_process = retrieve_files()

start = time.time()
#read every refined file:
for file in file_to_process: 
  #read content of file n
  file_data = retrieve_data(file_to_process,file)
  
  #parse and apply function per file read
  key,data = get_data(file_data)
  func_output = custom_func(data)

  #start building refined list
  refined_values.append([key,func_output])

elapsed = (time.time() - start)
print(elapsed)
  
df = pd.DataFrame.from_records(refined_values,columns=['TimerSeriesID','value']).sort_values(by=['TimerSeriesID'])
df = df.reset_index(drop=True)
print(df.head())

输出将是:

0.00045609474182128906
  TimerSeriesID      value
0             A  11.666667
1             B  16.250000
2             C  20.000000
3             D  18.333333

总结:

  • 在处理大型数据集时,您应该始终关注您将要使用的数据并保持最小化。仅使用可行的值。

  • 当操作由基本运算符或 python 本机库执行时,处理时间会更快。


推荐阅读