首页 > 解决方案 > Python 多处理数百个文件

问题描述

我有数百到数千个文件,我试图在处理过程中并行化,但我遇到了逻辑问题。

我的目标是让 8 个线程在一个进程上工作(由于处理 pandas 数据帧而不确定是否可能),或者让 8 个线程分别在一个文件上独立工作。

下面是我的应用程序中的一些简化代码。老实说,我真的不知道它在做什么。当我运行应用程序时,它开始浏览文件,但是当我打印文件名时,它们都乱了序。几分钟后,它的速度变慢了,但仍在打印乱序的随机字段名称。看起来东西被附加到输出文件中,但我不知道它们来自哪里。

我是否需要将我的文件列表分块并一次获取 8 个文件并以这种方式处理?还是我的代码在我的处理方式上不正确?我尝试了 Pool 和 Process 类,但似乎都不适用于这个用例。

import os.path
from os import path
import pandas as pd
import numpy as np
import math
from multiprocessing import Pool, Manager, Process
import multiprocessing as mp
from concurrent import futures as cf
from multiprocessing.pool import ThreadPool


def apply_ref_to_ind(input_df, temp_ref_df):

    final_df = pd.merge(input_df, temp_ref_df, how='outer', on='CODE')
    final_df['CALC2'] = final_df['CALC1'] - temp_ref_df['CALC1']

    return final_df


def worker(input_file):
    """Worker process for operating on partitioned reference data"""

    temp_input = pd.read_csv(input_file, dtype=str)
    code = temp_input['CODE'].unique()[0]
    temp_ref_df = ref_df.loc[ref_df['CODE'] == code]

    print(input_file)
    return_df = apply_ref_to_ind(temp_input, temp_ref_df)

    if path.exists("output_file.csv"):
        return_df[final_layout].to_csv('output_file.csv', index=None)
    else:
        return_df[final_layout].to_csv('output_file.csv', mode='a', header=False, index=None)


if __name__ == '__main__':
    file_list = ['file.csv', 'file2.csv', 'file3.csv']  # etc...
    ref_df = pd.read_csv('reference.csv')
    final_layout = ['ID', 'CODE', 'CALC1', 'CALC2']

    pool = Pool(8)  # Create a multiprocessing Pool
    pool.map(worker, [file for file in file_list])  # process data_inputs iterable with pool

标签: pythonpython-3.xmultithreadingpandasmultiprocessing

解决方案


推荐阅读