python - 如何使用所有 CPU 对大量文件进行子处理?
问题描述
我需要在命令行中使用 LaTeXML 库将 86,000 个 TEX 文件转换为 XML。我尝试编写一个 Python 脚本来使用subprocess
模块自动执行此操作,并利用所有 4 个内核。
def get_outpath(tex_path):
path_parts = pathlib.Path(tex_path).parts
arxiv_id = path_parts[2]
outpath = 'xml/' + arxiv_id + '.xml'
return outpath
def convert_to_xml(inpath):
outpath = get_outpath(inpath)
if os.path.isfile(outpath):
message = '{}: Already converted.'.format(inpath)
print(message)
return
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
message = '{}: Converted!'.format(inpath)
print(message)
def start():
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
maxtasksperchild=1)
print('Initialized {} threads'.format(multiprocessing.cpu_count()))
print('Beginning conversion...')
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
pool.close()
pool.join()
print("TIME: {}".format(total_time))
start()
该脚本导致Too many open files
并减慢了我的计算机速度。从 Activity Monitor 来看,这个脚本似乎试图一次创建 86,000 个转换子进程,并且每个进程都试图打开一个文件。也许这是pool.imap_unordered(convert_to_xml, preprints)
- 也许我不需要将 map 与 结合使用subprocess.Popen
,因为我有太多命令要调用?什么是替代方案?
我花了一整天的时间试图找出处理批量子处理的正确方法。我是 Python 的这一部分的新手,所以任何朝着正确方向前进的提示都将不胜感激。谢谢!
解决方案
在convert_to_xml
中,process = subprocess.Popen(...)
语句产生一个latexml
子进程。如果没有诸如 的阻塞调用process.communicate()
,convert_to_xml
即使latexml
在后台继续运行时也会结束。
convert_to_xml
结束后,池向关联的工作进程发送另一个任务以运行,因此再次convert_to_xml
被调用。再次latexml
在后台产生另一个进程。很快,您就掌握了latexml
进程的注意力,并且达到了打开文件数量的资源限制。
修复很简单:添加process.communicate()
到告诉convert_to_xml
等到该latexml
过程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
关于if __name__ == '__main__'
:
正如martineau 指出的那样,多处理文档中有一条警告,不应在模块的顶层调用产生新进程的代码。相反,代码应该包含在if __name__ == '__main__'
语句中。
在 Linux 中,如果您忽略此警告,则不会发生任何可怕的事情。但在 Windows 中,代码是“fork-bombs”。或者更准确地说,代码会导致生成一个未缓解的子进程链,因为在 Windows 上fork
是通过生成一个新的 Python 进程来模拟的,该进程然后导入调用脚本。每次导入都会产生一个新的 Python 进程。每个 Python 进程都会尝试导入调用脚本。在消耗所有资源之前,循环不会中断。
因此,为了对我们的 Windows-fork-bereft 兄弟友好,请使用
if __name__ == '__main__:
start()
有时进程需要大量内存。释放内存的唯一可靠方法是终止进程。maxtasksperchild=1
告诉在pool
完成 1 个任务后终止每个工作进程。然后它产生一个新的工作进程来处理另一个任务(如果有的话)。这释放了原始工作人员可能已分配的(内存)资源,而这些资源本来无法释放。
在您的情况下,工作进程似乎不需要太多内存,因此您可能不需要maxtasksperchild=1
. 在convert_to_xml
中,process = subprocess.Popen(...)
语句产生一个latexml
子进程。如果没有诸如 的阻塞调用process.communicate()
,convert_to_xml
即使latexml
在后台继续运行时也会结束。
convert_to_xml
结束后,池向关联的工作进程发送另一个任务以运行,因此再次convert_to_xml
被调用。再次latexml
在后台产生另一个进程。很快,您就掌握了latexml
进程的注意力,并且达到了打开文件数量的资源限制。
修复很简单:添加process.communicate()
到告诉convert_to_xml
等到该latexml
过程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
这chunksize
会影响工作人员在将结果发送回主进程之前执行的任务数。
有时这会影响性能,尤其是当进程间通信是整个运行时的重要部分时。
在您的情况下,convert_to_xml
需要相对较长的时间(假设我们等到latexml
完成)并且它只是返回None
. 所以进程间通信可能不是整个运行时的重要部分。因此,我不希望在这种情况下您会发现性能有显着变化(尽管进行实验永远不会有坏处!)。
在普通的 Python 中,map
不应仅用于多次调用函数。
出于类似的风格原因,我会保留pool.*map*
在我关心返回值的情况下使用这些方法。
所以而不是
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
你可以考虑使用
for preprint in preprints:
pool.apply_async(convert_to_xml, args=(preprint, ))
反而。
传递给任何pool.*map*
函数的可迭代对象会立即被使用。可迭代对象是否是迭代器并不重要。在这里使用迭代器没有特殊的内存好处。imap_unordered
返回一个迭代器,但它不以任何特别适合迭代器的方式处理其输入。
无论您传递什么类型的迭代,在调用pool.*map*
函数时,迭代都会被消耗并变成放入任务队列的任务。
这是证实这一说法的代码:
版本1.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in pool.imap_unordered(foo, gen()):
pass
pool.close()
pool.join()
if __name__ == '__main__':
start()
版本2.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in gen():
result = pool.apply_async(foo, args=(item, ))
pool.close()
pool.join()
if __name__ == '__main__':
start()
运行version1.py
并且version2.py
两者都产生相同的结果。
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
至关重要的是,您会注意到Got here
在运行开始时非常快速地打印了 10 次,然后在程序结束之前有很长的停顿(计算完成时)。
如果生成器gen()
被 以某种方式缓慢消耗pool.imap_unordered
,我们也应该期望Got here
打印速度也很慢。由于Got here
打印了 10 次且速度很快,我们可以看到迭代gen()
在任务完成之前就被完全消耗掉了。
运行这些程序应该可以让您相信
pool.imap_unordered
并且pool.apply_async
基本上以相同的方式将任务放入队列中:在调用之后立即进行。
推荐阅读
- mysql - 我收到 PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR
- c++ - 使用 libpq 插入没有 timeZone 的二进制时间戳
- c++ - CMake:根据 C 宏定义有选择地重新编译 C++ 程序模块
- android - 打开推送通知时反应本机应用程序崩溃
- wso2 - 在完全分布式 WSO2 APIM 设置中启动 WSO2 ISKM 5.10.0 服务器时出错
- ios - 如何修复 SWIFT 中 GPUImage 库中的内存问题
- linux - 从 Bash 中数组的各个组件中检索子字符串
- java - 在 Java 中,如何从一个项目调用 POST Web 服务到另一个项目
- python - Django,将值附加到外键列表
- dockerhub - DockerHub 构建的镜像太多