首页 > 解决方案 > 需要通过从 gzip 文件中读取,在 Python 中使用 2+ 个进程进行 CPU 绑定处理

问题描述

我有一个 gzip 文件跨越(压缩 10GB,未压缩 100GB),其中有一些报告由分界线分隔,我必须解析它。解析和处理数据需要很长时间,因此是 CPU 绑定问题(不是 IO 绑定问题)。所以我打算使用multiprocessing模块将工作分成多个流程。问题是我无法有效地向子进程发送/共享数据。我正在使用subprocess.Popen流式传输父进程中的未压缩数据。

process = subprocess.Popen('gunzip --keep --stdout big-file.gz',
                           shell=True, 
                           stdout=subprocess.PIPE)

我正在考虑使用 Lock() 读取/解析 child-process-1 中的一个报告,然后释放锁,然后切换到 child-process-2 以读取/解析下一个报告,然后切换回 child-process- 1 读取/解析下一个报告)。当我process.stdout与子进程共享 as args 时,出现酸洗错误。

我试图创建数据multiprocessing.Queue()并将multiprocessing.Pipe()数据发送到子进程,但这太慢了(实际上它比在单线程中执行它要慢得多,即串行)。

有关有效地将数据发送到子进程的任何想法/示例都会有所帮助。

标签: python-3.xpython-multiprocessing

解决方案


你可以尝试一些简单的东西吗?让每个工作进程运行自己的 实例gunzip,根本没有进程间通信。Worker 1 可以处理第一个报告,而跳过第二个报告。工人 2 则相反。每个工人都跳过其他报告。N然后是对工人的明显概括。

或不 ...

我认为您需要更具体地说明您的尝试,并可能提供有关您的问题的更多信息(例如:有多少条记录?它们有多大?)。

这是一个程序(“genints.py”),它打印一堆随机整数,每行一个,通过“xxxxx\n”分隔线分成组:

from random import randrange, seed

seed(42)
for i in range(1000):
    for j in range(randrange(1, 1000)):
        print(randrange(100))
    print("xxxxx")

因为它强制种子,所以每次都会生成相同的东西。现在是一个程序,可以通过我最初想到的最明显的方式并行和串行地处理这些组。 crunch()花费时间与组中的整数数量成二次方,因此它非常受 CPU 限制。一次运行的输出,并行部分使用(如图所示)3 个工作进程:

parallel result: 10,901,000,334 0:00:35.559782
serial   result: 10,901,000,334 0:01:38.719993

所以并行化运行大约需要三分之一的时间。这您的问题有什么不同?当然,“genints.py”的完整运行产生不到 200 万字节的输出,所以这是一个主要区别 - 但从这里无法猜测这是否是相关的区别。Perahps,例如,您的问题只是非常轻微地受 CPU 限制?从这里的输出可以明显看出,将标准输出块传递给工作进程的开销在这个程序中几乎是微不足道的。

简而言之,你可能需要给人们——就像我刚刚为你做的那样——一个他们可以运行的完整程序来重现你的问题。

import multiprocessing as mp

NWORKERS = 3
DELIM = "xxxxx\n"

def runjob():
    import subprocess
    # 'py' is just a shell script on my box that
    # invokes the desired version of Python -
    # which happened to be 3.8.5 for this run.
    p = subprocess.Popen("py genints.py",
                         shell=True,
                         text=True,
                         stdout=subprocess.PIPE)
    return p.stdout

# Return list of lines up to (but not including) next DELIM,
# or EOF. If the file is already exhausted, return None.
def getrecord(f):
    result = []
    foundone = False
    for line in f:
        foundone = True
        if line == DELIM:
            break
        result.append(line)
    return result if foundone else None

def crunch(rec):
    total = 0
    for a in rec:
       for b in rec:
          total += abs(int(a) - int(b))
    return total
        
if __name__ == "__main__":
    import datetime
    now = datetime.datetime.now

    s = now()
    total = 0
    f = runjob()
    with mp.Pool(NWORKERS) as pool:
        for i in pool.imap_unordered(crunch,
                                     iter((lambda: getrecord(f)), None)):
            total += i
    f.close()
    print(f"parallel result: {total:,}", now() - s)

    s = now()
    # try the same thing serially
    total = 0
    f = runjob()
    while True:
        rec = getrecord(f)
        if rec is None:
            break
        total += crunch(rec)
    f.close()
    print(f"serial   result: {total:,}", now() - s)

推荐阅读