首页 > 解决方案 > 异常触发后进程返回结果

问题描述

我有一个多处理设置,通过将所有计算值附加到lst. 它看起来大致是这样的:

from multiprocessing import Pool
from time import sleep


def fun(_):
    lst = []  # list that will be returned
    for i in range(200):
        lst.append(i)
        if not i % 10:
            sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
    return lst


if __name__ == '__main__':
    master = []
    processes = 2
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
    print(master)

我希望能够导致 aKeyboardInterrupt并让进程返回他们处理的列表,即使它们还没有完成,因为每次迭代只是添加一个新的子列表。(我的实际数据大致如下lst = ([], [[], ...], [[], ...]),每个空列表只包含整数,实际函数会return lst1, lst2, lst3

我试图将整个主要部分封装成try: except:这样:

try:
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
except KeyboardInterrupt:
    # somehow retrieve the values here
    pass

然而,我还没有以这种方式找到任何可能的解决方案。我如何告诉进程是时候提前退出并返回他们当前的结果了?

编辑以显示实际结构:main.py:


from other import Other

class Something:
    def __init__(self):
        pass  # stuff here
    
    def spawner(self):
        for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
            pass  # do stuff with the data

    def loop(self, _):
        # setup stuff
        Other(setup_stuff).start()

其他.py


class Other:
    def __init__(self):
        pass  # more stuff

    def start(self):
        lst1, lst2, lst3 = [], [], []
        for _ in range(self.episodes):
            pass  # do the actual computation
        return lst1, lst2, lst3

标签: pythonexceptionmultiprocessing

解决方案


也许您可以使用multiprocessing.Queue而不是 alist来返回变量。一开始设置一个队列,所有进程都会写入队列。

最后,从队列中读取所有值。

from time import sleep
from multiprocessing import Pool, Queue

q = None


def set_global_data(queue):
    global q
    q = queue


def fun(_):
    for i in range(200):
        q.put_nowait(i)
        if not i % 10:
            sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
    # nothing is returned


if __name__ == "__main__":
    master = Queue()
    processes = 2

    try:
        with Pool(processes, set_global_data, (master,)) as p:
            for result in p.imap_unordered(fun, range(processes)):
                pass
    except KeyboardInterrupt:
        pass

    while not master.empty():
        v = master.get_nowait()
        print(v)

编辑:有多个文件:

主文件

from other import Other
from multiprocessing import Pool, Queue


class Something:
    def __init__(self):
        pass  # stuff here

    def spawner(self):
        master = Queue()

        try:
            with Pool(2, Something.set_global_data, (master,)) as p:
                for _ in p.imap_unordered(self.loop, range(2)):
                    pass
        except KeyboardInterrupt:
            pass

        while not master.empty():
            v = master.get_nowait()
            print(v)

    def loop(self, _):
        # setup stuff
        Other().start()

    @staticmethod
    def set_global_data(queue):
        Other.q = queue


s = Something()
s.spawner()

其他.py

from time import sleep


class Other:
    q = None

    def __init__(self):
        pass  # more stuff

    def start(self):
        for i in range(200):
            Other.q.put_nowait(i)
            if not i % 10:
                sleep(0.1)

推荐阅读