python - 多处理和慢速文件系统
问题描述
假设我们有以下脚本 ( read_file.py
),它读取一个文件并将前 200000 行写入另一个文件。
import pandas as pd
import sys
a = sys.argv[1]
b = sys.argv[2]
df = pd.read_csv(a, header=0, sep="\t").head(200000).to_csv(b, header=True, index=False)
让我们有第二个脚本 ( test-latency.py
),它使用多处理(在两个文件上)调用第一个脚本。然后读取生成的两个文件并将它们合并。
import pandas as pd
import multiprocessing as mp
import sys
import subprocess
import time
a = sys.argv[1]
b = sys.argv[2]
l = [a, b]
pool = mp.Pool(processes = (mp.cpu_count() - 1))
for filename in l:
f_in = filename
f_out = filename + "out.tsv"
cmd = ['python', 'read_file.py', f_in, f_out]
pool.apply_async(subprocess.Popen, (cmd,))
pool.close()
pool.join()
time.sleep(1)
df1 = pd.read_csv(a + "out.tsv")
df2 = pd.read_csv(b + "out.tsv")
df = pd.merge(df1, df2, on="Name").to_csv("test.tsv", sep="\t", header=0)
问题在于,根据文件系统速度(由于 NFS 缓存),pool.join() 之后文件可能不存在。这通过time.sleep(1)
延迟直到文件出现以某种方式解决。但这不是最佳解决方案,因为对于慢速文件系统,它可能会导致FileNotFoundError: [Errno 2]
. 一种解决方案是提供一个通用的延迟等待选项,但我认为让用户参与此类决策并不明智。你对这个问题有什么建议?
解决方案
文件系统缓存不是您的问题。您正在使用 创建子流程multiprocessing.Pool
,并且每个子流程本身都使用 生成子流程subprocess.Popen
。问题是它subprocess.Popen
只是产生了进程,而不是等待完成。因此,即使子进程multiprocessing.Pool
全部完成后,这些孙子进程可能仍在运行。
一个简单的解决方法是自己管理子流程:
# Store subprocess handles (Popen objects).
subprocesses = []
# Launch subprocesses in the background.
for filename in l:
f_in = filename
f_out = filename + "out.tsv"
proc = subprocess.Popen(['python', 'read_file.py', f_in, f_out])
subprocesses.append(proc)
# Wait for each subprocess to finish.
for proc in subprocesses:
if proc.wait() != 0:
# Error occurred, handle it however you want
raise RuntimeError('Subprocess failed with nonzero exit code')
还有一个问题是并行执行此操作是否有用,因为 I/O(网络或磁盘)可能是瓶颈。但这是你可以自己测试的东西。
推荐阅读
- r - 是否有自动化(手动)R代码来计算P值
- css - 在非全屏和全屏模式下,Chrome Mobile 屏幕底部的高度元素相同
- github - 使用持续部署发布机器人后不显示图像
- c# - 始终定义 Visual Studio 2013 DEBUG 预处理器指令
- java - Retrofit POST request using @Body returns empty response body
- uwp - UWP:NavigationView 无法更改面板背景
- c - 如何解决编译错误:警告“使用此标头的文件必须使用 _SVID_SOURCE 或 _XOPEN_SOURCE 编译”
- java - 在 UTC 中使用到期日期时,JWT 的到期不起作用
- c# - 如何动态替换 json 附带的属性?
- json - 状态元素没有出现在带有 react-tooltip 的反应简单地图上