首页 > 解决方案 > 即使在 .join() 之后,多处理也会创建僵尸进程

问题描述

我有一大段代码,我正在以下列方式并行化:-

def stats_wrapper(a,b,c,d):
    q = mp.Queue()
    # b here is a dictionary
    processes = [mp.Process(target = stats,args = (a,{b1:b[b1]},c,d,q) ) for b1 in b]
    for p in processes:
        p.start()
    results = []
    for p in processes:
        results.append(q.get())
    return(results)

执行此块后,我看到很多僵尸进程。我正在尝试.join()在该行之前以下列方式使用该方法return(result):-

for p in processes:
   p.join()

但这无助于摆脱僵尸进程。有人可以帮我确定我的代码哪里出了问题吗?

编辑: - 我在代码的其他地方使用了另一种并行化方法,这再次给了我许多僵尸进程,除了我不知道如何重构代码以添加连接。

q = mp.Queue()
jobs = (func1,func2,func3)
args = ((arg1,arg2,arg3),(arg2,arg3),(arg1,arg4))
for job,arg in zip(jobs,args):
   mp.Process(target = job,args = arg,name = str(job.__name__)).start()
result = []
for _ in range(len(job)): 
   result.append(q.get())

标签: pythonmultithreadingmultiprocessing

解决方案


如果你愿意尝试更高级别multiprocessing.Pool()

def stats_wrapper(a, b, c, d):
    with multiprocessing.Pool() as p:
        args = [(a, {b1: b[b1]}, c, d) for b1 in b]
        return list(p.starmap(stats, args))

应该是等价的(除了q不被传递给stats; 该函数可以简单地返回结果)。

如果您不需要结果有序,请使用p.imap_unordered(),它可以更有效地使用进程池。


推荐阅读