首页 > 解决方案 > Python:产生时并行处理

问题描述

我正在文件列表中创建行的生成器,我的方法类似于:

def load(f_name: str):
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list):
    for f in f_names:
        for line in load(f):
            yield line

如果可能且有用的话,我想做的是加载下一个文件,同时屈服于另一个文件。作为多处理的新手,我尝试了以下方法:

cache = dict()

def load(f_name: str, id: int):
    global cache
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    cache[id] = lines

def iter_list(arr):
    for x in arr:
        yield x

def iter_files(f_names: list):
    global cache
    num_files = len(f_names)
    load(f_names[0], 0)
    for n in range(num_files - 1):
        current = mp.Process(target=iter_list, args=(cache[n],))
        next = mp.Process(target=load, args=(f_names[n + 1], n + 1))
        current.start()
        next.start()
        current.join()
        next.join()
        del cache[n]
    iter_list(cache[num_files - 1])
    del cache[num_files - 1]

但除了看起来过于复杂之外,它还不起作用。

首先,如果我不将主代码放入 'if __name__ == "__main__":' (我宁愿不是强制性的),我会收到以下错误:

RuntimeError:
      An attempt has been made to start a new process before the
      current process has finished its bootstrapping phase.

但即使我这样做,文件也不会添加到缓存中:

current = mp.Process(target=iter_list, args=(cache[n],))
KeyError: 1

是否有可能实现我想要做的事情?我究竟做错了什么?

谢谢你们

标签: pythonmultiprocessingyield

解决方案


multiprocessing.Queue课程非常适合此。您put在一端(子流程)输入线路,get然后在另一端(主流程)返回。不幸的是,没有内置方法可以将队列标记为“已完成”,因此我们需要put一个标记值,例如None指示所有行都已处理。

import multiprocessing as mp

def load(f_name: str):
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list, queue: mp.Queue):
    for f in f_names:
        for line in load(f):
            queue.put(line)
    queue.put(None)

def iter_files_process(f_names: list):
    queue = mp.Queue()
    process = mp.Process(target=iter_files, args=(f_names, queue))
    process.start()
    while True:
        line = queue.get()
        if line is None:  # End-of-queue value.
            break
        yield line
    process.join()  # Wait for the process to be completely finished.

if __name__ == "__main__":
    for line in iter_files_process(['a.txt', 'b.txt']):
        print(line, end='')

推荐阅读