python - 异常触发后进程返回结果
问题描述
我有一个多处理设置,通过将所有计算值附加到lst
. 它看起来大致是这样的:
from multiprocessing import Pool
from time import sleep
def fun(_):
lst = [] # list that will be returned
for i in range(200):
lst.append(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
return lst
if __name__ == '__main__':
master = []
processes = 2
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
print(master)
我希望能够导致 aKeyboardInterrupt
并让进程返回他们处理的列表,即使它们还没有完成,因为每次迭代只是添加一个新的子列表。(我的实际数据大致如下lst = ([], [[], ...], [[], ...])
,每个空列表只包含整数,实际函数会return lst1, lst2, lst3
)
我试图将整个主要部分封装成try: except:
这样:
try:
for result in Pool(processes).imap_unordered(fun, range(processes)):
master.append(result)
except KeyboardInterrupt:
# somehow retrieve the values here
pass
然而,我还没有以这种方式找到任何可能的解决方案。我如何告诉进程是时候提前退出并返回他们当前的结果了?
编辑以显示实际结构:main.py:
from other import Other
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
pass # do stuff with the data
def loop(self, _):
# setup stuff
Other(setup_stuff).start()
其他.py
class Other:
def __init__(self):
pass # more stuff
def start(self):
lst1, lst2, lst3 = [], [], []
for _ in range(self.episodes):
pass # do the actual computation
return lst1, lst2, lst3
解决方案
也许您可以使用multiprocessing.Queue
而不是 alist
来返回变量。一开始设置一个队列,所有进程都会写入队列。
最后,从队列中读取所有值。
from time import sleep
from multiprocessing import Pool, Queue
q = None
def set_global_data(queue):
global q
q = queue
def fun(_):
for i in range(200):
q.put_nowait(i)
if not i % 10:
sleep(0.1) # 'long task', cause a KeyboardInterrupt in this time
# nothing is returned
if __name__ == "__main__":
master = Queue()
processes = 2
try:
with Pool(processes, set_global_data, (master,)) as p:
for result in p.imap_unordered(fun, range(processes)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
编辑:有多个文件:
主文件
from other import Other
from multiprocessing import Pool, Queue
class Something:
def __init__(self):
pass # stuff here
def spawner(self):
master = Queue()
try:
with Pool(2, Something.set_global_data, (master,)) as p:
for _ in p.imap_unordered(self.loop, range(2)):
pass
except KeyboardInterrupt:
pass
while not master.empty():
v = master.get_nowait()
print(v)
def loop(self, _):
# setup stuff
Other().start()
@staticmethod
def set_global_data(queue):
Other.q = queue
s = Something()
s.spawner()
其他.py
from time import sleep
class Other:
q = None
def __init__(self):
pass # more stuff
def start(self):
for i in range(200):
Other.q.put_nowait(i)
if not i % 10:
sleep(0.1)
推荐阅读
- firebase - 计划的 Firebase 云函数的首次运行
- raspberry-pi - 如何在 Raspberry OS 上安装 Spyder 4 IDE
- c# - 无法打开 sqlite 数据库,因为来自 Nuget 的 System.Data.SQLite 在 WinForms 软件中更新到版本 1.0.113.6
- html - CSS Button 使用多行,而不是 1
- c# - DotNet Core 3.1 MongoDb 配置多个集合不能做简单的查找
- javascript - 使用 Javascript 以编程方式创建 Azure 存储帐户的最佳选择是什么?
- svg - 如何通过从内存加载 svg 来创建 SDL 纹理?
- java - Is it possible to make POJOs generated with the Avro Maven plugin play nice with Jackson?
- sql - SQL 查询:计算表中不同值的数量
- python - 如何将图像水平分割成相等大小的块?