python - 为什么 concurrent.futures.ProcessPoolExecutor() 跳过迭代?
问题描述
我有一个函数,我使用这样的concurrent.futures.ProcessPoolExecutor()
东西并行化:
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(my_func, ids, it.repeat(num_ids))
whereids
是由两个元素组成的元组列表。第一个元素包含一个整数,对于每个后续元组递增 1。我用它来创建一个“迭代进度跟踪器”。第二个元素包含输入my_func
用途。
my_func
在这里添加太长了,我无法获得具有互惠行为的 MRE。但是,它看起来像这样:
def my_func(id, num_ids):
print(f"{id[0]} of {num_ids}")
# Extract something from a database, transform it and then add the new data back into the database
在一次运行中,我注意到在迭代跟踪器上大约 5k 时,它突然跳到 10k,结结巴巴,然后继续前进。之后,它每隔一段时间就会跳过一些记录。如果我再次运行代码,这种模式会重复,但每次跳过的地方都略有不同。
我在 VS Code 中正式进入调试模式,但令我惊讶的是,从调试器运行代码时没有跳过任何记录。没有错误,什么都没有。
我发现停止在调试器之外跳过的唯一方法是将max_workers
参数设置为我的线程的一半。
我意识到如果没有 MRE,这很难诊断,但我希望其他人可能遇到过这个问题或认识到这些症状?
解决方案
鉴于所提供的信息,不容易回答具体案例。
我最好的猜测是my_func
.
实际上,如果发生异常,则不会打印任何内容,并且不会终止执行。
为了验证我的假设,我将定义一个装饰器来打印函数中发生的异常:
import functools
def log_function(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
print(args, kwargs, repr(exc))
return wrapper
并将装饰器应用于函数。
@log_function
def my_func(my_id, num_ids):
...
推荐阅读
- android - 如何在 Kotlin 中通过 Android Studio 中的意图传递变量?
- c++ - 如果参数通过值传递,为什么这个程序可以工作?
- javascript - JavaScript window.innerwidth 不能正常工作
- reactjs - 使用历史位置键反应 UseEffect
- javascript - 是否可以列出所有已知的本地化货币?
- javascript - 当单击元素时,它会出现在其他地方 javascript/jquery
- javascript - 为什么我们需要 Typescript 中的 any 类型?
- javascript - 加载时为 D3 热图单元设置动画?
- c++ - 如何使用函数的输出初始化 const 数组结构字段?
- python - python打印函数中希腊字母的下标问题