python - 设置多线程 python 3
问题描述
我有一个 python 脚本,它获取输入文件列表并在这些文件上运行 cmd。列表中平均大约有 6,000 个文件。(因批次而异)。这个基于 cmd 的程序平均每个输入文件大约需要 1-2 分钟才能完成,因此在一天结束时,处理整个列表大约需要几天时间。
我希望使用多线程在我的几个处理器之间拆分列表,以便可以串联处理列表并最终更快!
输入再次是文件列表,输出被写入与输入文件相同的目录,但为 inputx_output.csv
我似乎无法让多线程工作。这是当前没有多线程的代码段。每次我查看一些多线程引用时,它似乎都不能很好地适应我的代码。
这是一个段,不包含单独运行所需的所有引用/输入。
import shutil as sh
Input_File ='c:/Users/James_Mann1/Desktop/TestBench'
import os
try:
os.chdir(Input_File)
except:
catch = 1
try:
sh.rmtree("Cas-Out")
except:
catch = 1
try:
os.mkdir("Cas-Out")
except:
catch = 1
#print(os.getcwd())
os.chdir("Cas-OffInput")
path = os.getcwd()
Input_Casoff = os.listdir(path)
src = Input_File + "/cas-offinder.exe"
dst = Input_File + "/Cas-OffInput/cas-offinder.exe"
sh.copyfile(src, dst)
for value in Input_Casoff:
subprocess.call("cas-offinder " + value + " G0 " + " " + value + "out.txt")
#os.system("cas-offinder " + value + " G0 " + " " + value + "out.txt")
我不确定从哪里开始?我似乎找不到写这篇文章的好参考。我要做的就是获取列表条目并在它们上串联运行 cmd,因此该过程完成得更快。输出由 cmd 生成,因此不需要捕获任何内容。
编辑,我能够解决最初的问题。发布在下面...现在我想返回一个计数,它告诉我还剩多少。
import os
from queue import Queue
from threading import Thread
from time import sleep
import shutil as sh
import time
#%%
import shutil as sh
Input_File ='c:/Users/James_Mann1/Desktop/TestBench'
import os
try:
os.chdir(Input_File)
except:
catch = 1
try:
sh.rmtree("Cas-Out")
except:
catch = 1
try:
os.mkdir("Cas-Out")
except:
catch = 1
#print(os.getcwd())
os.chdir("Cas-OffInput")
path = os.getcwd()
Input_Casoff = os.listdir(path)
src = Input_File + "/cas-offinder.exe"
dst = Input_File + "/Cas-OffInput/cas-offinder.exe"
sh.copyfile(src, dst)
Task_Count = len(Input_Casoff)
from multiprocessing import Pool
def Cas_off(x):
os.system("cas-offinder " + x + " G0 " + " " + x + "out.txt")
#print(x)
return x
if __name__ == '__main__':
with Pool(8) as p:
print(p.map(Cas_off, Input_Casoff))
关于实施获取剩余条目的方法的任何建议?所以
x/4000 已完成或正在处理 x/4000。
谢谢!
解决方案
一种方法是使用mpi4py,它是 MPI(消息传递接口)的 python 版本,它使您能够进行并行计算。MPI 是一个相当大的库,使您能够在多个处理器上运行程序,甚至可以在这些处理器之间进行通信。
在 MAC 上,我使用 brew 安装了它:
brew install open-mpi
然后,您可以使用 pip 安装 mpi4py:
pip3 install mpi4py
那么你的代码结构应该是这样的:
from mpi4py import MPI
comm = MPI.COMM_WORLD # MPI communicator
size = comm.Get_size() # Number of processors
rank = comm.Get_rank() # Rank of the processor
# Input and output file names
nb_files = 10
input_files = [f'input_file_{i}.txt' for i in range(nb_files)]
output_files = [f'output_file_{i}.txt' for i in range(nb_files)]
# Loop on all files to process
for i in range(len(input_files)):
# The current processor will process only files at indexes
# equal to his rank modulo the total number of processors
# else it will skip the file.
if (i%size==rank):
# Here processor rank must:
# - read file input_files[i]
# - process the data
# - write file output_files[i]
print(f'CPU{rank} transforms "{input_files[i]}" into "{output_files[i]}".') # dummy code
您可以使用以下命令执行代码(将 4 替换为所需的 CPU 数量,将 script.py 替换为您的 python 程序文件的名称):
mpiexec -n 4 python3 script.py
输出:
% mpiexec -n 4 python3 script.py
CPU3 transforms "input_file_3.txt" into "output_file_3.txt".
CPU3 transforms "input_file_7.txt" into "output_file_7.txt".
CPU1 transforms "input_file_1.txt" into "output_file_1.txt".
CPU1 transforms "input_file_5.txt" into "output_file_5.txt".
CPU1 transforms "input_file_9.txt" into "output_file_9.txt".
CPU0 transforms "input_file_0.txt" into "output_file_0.txt".
CPU0 transforms "input_file_4.txt" into "output_file_4.txt".
CPU0 transforms "input_file_8.txt" into "output_file_8.txt".
CPU2 transforms "input_file_2.txt" into "output_file_2.txt".
CPU2 transforms "input_file_6.txt" into "output_file_6.txt".
小心避免多个处理器处理同一个文件以避免冲突(所有文件名应该不同)。同样,MPI 是一个非常大的库,您可以想象使用它添加通信(例如每分钟)以减少所有处理器之间完成的文件数量。
希望这可以帮助。
推荐阅读
- typescript - 是否可以在 vscode 中调暗打字稿类型注释?
- sql - 使用 SQL 查询从 Access 中的表中删除最后 10 行
- javascript - 为什么我的 NULL 检查在 Node.js 中总是失败?
- ios - 如何使用 FileManager 替换整个文件
- c++ - 减少抖动应该重点关注哪个部分?
- sockets - 为什么 BlueZ 中有两种不同的套接字选项(RFCOMM 和 L2CAP)?
- javascript - DOM 选择器在 Vanilla JavaScript 中给出 null 但在 jquery 中工作
- c++ - 如何将外部库链接到 msvc cl 编译器
- javascript - Javascript UTC日期不同的值
- c# - Asp.Net Core 5.0 中 EntityState.Detached 的空异常