python - Python 多处理数百个文件
问题描述
我有数百到数千个文件,我试图在处理过程中并行化,但我遇到了逻辑问题。
我的目标是让 8 个线程在一个进程上工作(由于处理 pandas 数据帧而不确定是否可能),或者让 8 个线程分别在一个文件上独立工作。
下面是我的应用程序中的一些简化代码。老实说,我真的不知道它在做什么。当我运行应用程序时,它开始浏览文件,但是当我打印文件名时,它们都乱了序。几分钟后,它的速度变慢了,但仍在打印乱序的随机字段名称。看起来东西被附加到输出文件中,但我不知道它们来自哪里。
我是否需要将我的文件列表分块并一次获取 8 个文件并以这种方式处理?还是我的代码在我的处理方式上不正确?我尝试了 Pool 和 Process 类,但似乎都不适用于这个用例。
import os.path
from os import path
import pandas as pd
import numpy as np
import math
from multiprocessing import Pool, Manager, Process
import multiprocessing as mp
from concurrent import futures as cf
from multiprocessing.pool import ThreadPool
def apply_ref_to_ind(input_df, temp_ref_df):
final_df = pd.merge(input_df, temp_ref_df, how='outer', on='CODE')
final_df['CALC2'] = final_df['CALC1'] - temp_ref_df['CALC1']
return final_df
def worker(input_file):
"""Worker process for operating on partitioned reference data"""
temp_input = pd.read_csv(input_file, dtype=str)
code = temp_input['CODE'].unique()[0]
temp_ref_df = ref_df.loc[ref_df['CODE'] == code]
print(input_file)
return_df = apply_ref_to_ind(temp_input, temp_ref_df)
if path.exists("output_file.csv"):
return_df[final_layout].to_csv('output_file.csv', index=None)
else:
return_df[final_layout].to_csv('output_file.csv', mode='a', header=False, index=None)
if __name__ == '__main__':
file_list = ['file.csv', 'file2.csv', 'file3.csv'] # etc...
ref_df = pd.read_csv('reference.csv')
final_layout = ['ID', 'CODE', 'CALC1', 'CALC2']
pool = Pool(8) # Create a multiprocessing Pool
pool.map(worker, [file for file in file_list]) # process data_inputs iterable with pool
解决方案
推荐阅读
- python - 如何为小提琴图添加颜色?
- html - 如何在最大缩放时停止表格溢出外部 div
- node.js - 我想在数组中聚合到数组 mongodb
- chess - 如何理解 Chess.js 的 fen 参数
- javascript - 使用调整后的大小 Javascript 更改图像大小
- html - 如何将 html 代码添加到 laravel 控制器?
- python - python中的最小值和最大值?
- ios - 当用户点击 InputAccessoryView 时如何滚动到 CollectionView 的底部
- java - 在 MongoDB 中,$set 比 $addToSet $each 快吗?
- firebase - Firebase 存储读取安全规则似乎没有任何效果