首页 > 解决方案 > 为什么 concurrent.futures.ProcessPoolExecutor() 跳过迭代?

问题描述

我有一个函数,我使用这样的concurrent.futures.ProcessPoolExecutor()东西并行化:

with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(my_func, ids, it.repeat(num_ids))

whereids是由两个元素组成的元组列表。第一个元素包含一个整数,对于每个后续元组递增 1。我用它来创建一个“迭代进度跟踪器”。第二个元素包含输入my_func用途。

my_func在这里添加太长了,我无法获得具有互惠行为的 MRE。但是,它看起来像这样:

def my_func(id, num_ids):
    print(f"{id[0]} of {num_ids}")
    # Extract something from a database, transform it and then add the new data back into the database

在一次运行中,我注意到在迭代跟踪器上大约 5k 时,它突然跳到 10k,结结巴巴,然后继续前进。之后,它每隔一段时间就会跳过一些记录。如果我再次运行代码,这种模式会重复,但每次跳过的地方都略有不同。

我在 VS Code 中正式进入调试模式,但令我惊讶的是,从调试器运行代码时没有跳过任何记录。没有错误,什么都没有。

我发现停止在调试器之外跳过的唯一方法是将max_workers参数设置为我的线程的一半。

我意识到如果没有 MRE,这很难诊断,但我希望其他人可能遇到过这个问题或认识到这些症状?

标签: pythonconcurrent.futures

解决方案


鉴于所提供的信息,不容易回答具体案例。
我最好的猜测是my_func.
实际上,如果发生异常,则不会打印任何内容,并且不会终止执行。
为了验证我的假设,我将定义一个装饰器来打印函数中发生的异常:

import functools

def log_function(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as exc:
            print(args, kwargs, repr(exc))

    return wrapper

并将装饰器应用于函数。

@log_function
def my_func(my_id, num_ids):
   ...

推荐阅读