首页 > 解决方案 > 生成器函数导致在所有进程完成后捕获异常

问题描述

我写了这个简短的 POC 来帮助理解我遇到的问题,希望有人可以向我解释发生了什么以及如何解决它和/或提高它的效率。

我使用迭代器、itertools 和生成器的目标是因为我不想在内存中存储一​​个巨大的列表,因为我扩大列表将变得难以管理,我不想循环整个列表来做某事每一次。请注意,我对生成器、迭代器和多处理的概念相当陌生,并且今天编写了这段代码,所以,如果你能清楚地告诉我想念理解这些东西应该如何工作的工作流程,请教育我并帮助我代码更好。

您应该能够按原样运行代码并查看我面临的问题。我期待一旦捕获到异常,它就会被引发并且脚本终止,但是我看到正在发生的事情,异常被捕获但其他进程继续。

如果我注释掉generateRange生成器并创建一个虚拟列表并将其传递给futures = (map(executor.submit, itertools.repeat(execute), mylist)),则异常会被捕获并按预期退出脚本。

我的猜测是,生成器/迭代器必须在脚本终止之前完成生成范围,据我了解,情况并非如此。

我选择使用生成器函数/迭代器的原因是您只能在需要时访问它们。

有没有办法让我停止生成器继续并让异常适当地引发。

这是我的 POC:

import concurrent.futures

PRIMES = [0]*80

import time

def is_prime(n):
    print("Enter")
    time.sleep(5)
    print("End")
    1/0

child = []
def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        for i in PRIMES:
            child.append(executor.submit(is_prime, i))
        for future in concurrent.futures.as_completed(child):
            if future.exception() is not None:
                print("Throw an exception")
                raise future.exception()

if __name__ == '__main__':
    main()

编辑:我用更简单的东西更新了 POC。

标签: python-3.xiteratormultiprocessinggeneratoritertools

解决方案


不可能立即取消正在运行的期货,但这至少使得在引发异常后只有少数进程运行:

import concurrent.futures                                                  

PRIMES = [0]*80                                                            

import time                                                                

def is_prime(n):                                                           
    print("Enter")                                                         
    time.sleep(5)                                                          
    print("End")                                                           
    1/0                                                                    

child = []                                                                 
def main():                                                                
    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        for i in PRIMES:                                                   
            child.append(executor.submit(is_prime, i))                     
        for future in concurrent.futures.as_completed(child):              
            if future.exception() is not None:                             
                for fut in child:                                          
                    fut.cancel()                                           
                print("Throw an exception")                                
                raise future.exception()                                   

if __name__ == '__main__':                                                 
    main()                                                                 

推荐阅读