首页 > 解决方案 > 多处理和慢速文件系统

问题描述

假设我们有以下脚本 ( read_file.py),它读取一个文件并将前 200000 行写入另一个文件。

import pandas as pd
import sys

a = sys.argv[1]
b = sys.argv[2]

df = pd.read_csv(a, header=0, sep="\t").head(200000).to_csv(b, header=True, index=False)

让我们有第二个脚本 ( test-latency.py),它使用多处理(在两个文件上)调用第一个脚本。然后读取生成的两个文件并将它们合并。

import pandas as pd
import multiprocessing as mp
import sys
import subprocess
import time

a = sys.argv[1]
b = sys.argv[2]

l = [a, b]

pool = mp.Pool(processes = (mp.cpu_count() - 1))
for filename in l:
    f_in = filename
    f_out = filename + "out.tsv"
    cmd = ['python', 'read_file.py', f_in, f_out]
    pool.apply_async(subprocess.Popen, (cmd,))
pool.close()
pool.join()

time.sleep(1)

df1 = pd.read_csv(a + "out.tsv")
df2 = pd.read_csv(b + "out.tsv")

df = pd.merge(df1, df2, on="Name").to_csv("test.tsv", sep="\t", header=0)

问题在于,根据文件系统速度(由于 NFS 缓存),pool.join() 之后文件可能不存在。这通过time.sleep(1)延迟直到文件出现以某种方式解决。但这不是最佳解决方案,因为对于慢速文件系统,它可能会导致FileNotFoundError: [Errno 2]. 一种解决方案是提供一个通用的延迟等待选项,但我认为让用户参与此类决策并不明智。你对这个问题有什么建议?

标签: pythonperformancemultiprocessing

解决方案


文件系统缓存不是您的问题。您正在使用 创建子流程multiprocessing.Pool,并且每个子流程本身都使用 生成子流程subprocess.Popen。问题是它subprocess.Popen只是产生了进程,而不是等待完成。因此,即使子进程multiprocessing.Pool全部完成后,这些孙子进程可能仍在运行。

一个简单的解决方法是自己管理子流程:

# Store subprocess handles (Popen objects).
subprocesses = []

# Launch subprocesses in the background.
for filename in l:
    f_in = filename
    f_out = filename + "out.tsv"
    proc = subprocess.Popen(['python', 'read_file.py', f_in, f_out])
    subprocesses.append(proc)

# Wait for each subprocess to finish.
for proc in subprocesses:
    if proc.wait() != 0:
        # Error occurred, handle it however you want
        raise RuntimeError('Subprocess failed with nonzero exit code')

还有一个问题是并行执行此操作是否有用,因为 I/O(网络或磁盘)可能是瓶颈。但这是你可以自己测试的东西。


推荐阅读