首页 > 解决方案 > 设置多线程 python 3

问题描述

我有一个 python 脚本,它获取输入文件列表并在这些文件上运行 cmd。列表中平均大约有 6,000 个文件。(因批次而异)。这个基于 cmd 的程序平均每个输入文件大约需要 1-2 分钟才能完成,因此在一天结束时,处理整个列表大约需要几天时间。

我希望使用多线程在我的几个处理器之间拆分列表,以便可以串联处理列表并最终更快!

输入再次是文件列表,输出被写入与输入文件相同的目录,但为 inputx_output.csv

我似乎无法让多线程工作。这是当前没有多线程的代码段。每次我查看一些多线程引用时,它似乎都不能很好地适应我的代码。

这是一个段,不包含单独运行所需的所有引用/输入。

import shutil as sh
Input_File ='c:/Users/James_Mann1/Desktop/TestBench'

import os
try:
    os.chdir(Input_File)
except:
    catch = 1
try:
    sh.rmtree("Cas-Out")
except:
    catch = 1
try:
    os.mkdir("Cas-Out")
except:
    catch = 1 
#print(os.getcwd())
os.chdir("Cas-OffInput")
path = os.getcwd()
Input_Casoff = os.listdir(path)
src = Input_File + "/cas-offinder.exe"
dst = Input_File + "/Cas-OffInput/cas-offinder.exe"
sh.copyfile(src, dst)
for value in Input_Casoff:
    subprocess.call("cas-offinder " + value + " G0 " + " " + value + "out.txt")
    #os.system("cas-offinder " + value + " G0 " + " " + value + "out.txt")

我不确定从哪里开始?我似乎找不到写这篇文章的好参考。我要做的就是获取列表条目并在它们上串联运行 cmd,因此该过程完成得更快。输出由 cmd 生成,因此不需要捕获任何内容。

编辑,我能够解决最初的问题。发布在下面...现在我想返回一个计数,它告诉我还剩多少。

import os
from queue import Queue
from threading import Thread
from time import sleep
import shutil as sh
import time
#%%
import shutil as sh
Input_File ='c:/Users/James_Mann1/Desktop/TestBench'

import os
try:
    os.chdir(Input_File)
except:
    catch = 1
try:
    sh.rmtree("Cas-Out")
except:
    catch = 1
try:
    os.mkdir("Cas-Out")
except:
    catch = 1
#print(os.getcwd())
os.chdir("Cas-OffInput")
path = os.getcwd()
Input_Casoff = os.listdir(path)
src = Input_File + "/cas-offinder.exe"
dst = Input_File + "/Cas-OffInput/cas-offinder.exe"
sh.copyfile(src, dst)

Task_Count = len(Input_Casoff)

from multiprocessing import Pool

def Cas_off(x):
    os.system("cas-offinder " + x + " G0 " + " " + x + "out.txt")
    #print(x)
    return x

if __name__ == '__main__':
    with Pool(8) as p:
        print(p.map(Cas_off, Input_Casoff))

关于实施获取剩余条目的方法的任何建议?所以

x/4000 已完成或正在处理 x/4000。

谢谢!

标签: pythonmultithreading

解决方案


一种方法是使用mpi4py,它是 MPI(消息传递接口)的 python 版本,它使您能够进行并行计算。MPI 是一个相当大的库,使您能够在多个处理器上运行程序,甚至可以在这些处理器之间进行通信。

在 MAC 上,我使用 brew 安装了它:

brew install open-mpi

然后,您可以使用 pip 安装 mpi4py:

pip3 install mpi4py

那么你的代码结构应该是这样的:

from mpi4py import MPI

comm = MPI.COMM_WORLD # MPI communicator
size = comm.Get_size() # Number of processors
rank = comm.Get_rank() # Rank of the processor

# Input and output file names
nb_files = 10
input_files = [f'input_file_{i}.txt' for i in range(nb_files)]
output_files = [f'output_file_{i}.txt' for i in range(nb_files)]

# Loop on all files to process
for i in range(len(input_files)):
    # The current processor will process only files at indexes
    # equal to his rank modulo the total number of processors
    # else it will skip the file.
    if (i%size==rank):
        # Here processor rank must:
        # - read file input_files[i]
        # - process the data
        # - write file output_files[i]
        print(f'CPU{rank} transforms "{input_files[i]}" into "{output_files[i]}".') # dummy code

您可以使用以下命令执行代码(将 4 替换为所需的 CPU 数量,将 script.py 替换为您的 python 程序文件的名称):

mpiexec -n 4 python3 script.py

输出:

% mpiexec -n 4 python3 script.py
CPU3 transforms "input_file_3.txt" into "output_file_3.txt".
CPU3 transforms "input_file_7.txt" into "output_file_7.txt".
CPU1 transforms "input_file_1.txt" into "output_file_1.txt".
CPU1 transforms "input_file_5.txt" into "output_file_5.txt".
CPU1 transforms "input_file_9.txt" into "output_file_9.txt".
CPU0 transforms "input_file_0.txt" into "output_file_0.txt".
CPU0 transforms "input_file_4.txt" into "output_file_4.txt".
CPU0 transforms "input_file_8.txt" into "output_file_8.txt".
CPU2 transforms "input_file_2.txt" into "output_file_2.txt".
CPU2 transforms "input_file_6.txt" into "output_file_6.txt".

小心避免多个处理器处理同一个文件以避免冲突(所有文件名应该不同)。同样,MPI 是一个非常大的库,您可以想象使用它添加通信(例如每分钟)以减少所有处理器之间完成的文件数量。

希望这可以帮助。


推荐阅读