python - 如何忽略任务失败的工作人员并将其任务重新分配给其他工作人员?
问题描述
我正在使用 client.map 在 N 个单线程工作人员(在 N 台机器上)的池上运行一个函数,其中一个工作人员失败了。我想知道是否有一种方法可以自动处理工作人员提出的异常,将其失败的任务重新分配给其他工作人员,并将其从池中忽略或排除?
我试过用下面显示的方法模拟这个问题。为了导致一名工作人员失败,我在其中提出了一个 OSError ,my_function
提交给client.map
如下:futures = client.map(my_function, range(100))
在我的示例中,“Computer123”上的工作人员将失败。为了处理由 引发的异常my_function
,我使用 sys.exit 在exception_handler
. 因此,当工作人员的任务失败时,会调用 sys.exit。结果是坏的worker的distributed.nanny捕获了故障并重新启动了worker,而客户端重新分配了它失败的任务。但是一旦坏工人再次备份,它会再次接收任务,因为它仍在池中。它再次失败并重复该过程。随着它继续失败,最终其他工人完成了所有任务。如果我可以自动处理来自“Computer123”等不良工作人员的异常并将其从池中删除,那将是理想的。也许我需要做的就是将其从池中移除?
@exception_handler
def my_function(x):
import socket
import time
time.sleep(5)
if socket.gethostname() == 'Computer123':
raise(OSError)
else:
return x**2
def exception_handler(orig_func):
def wrapper(*args,**kwargs):
try:
return orig_func(*args,**kwargs)
except:
import sys
sys.exit(1)
return wrapper
解决方案
作为一种解决方法,您可以保留一个坏工作人员的字典,每次确定它是坏的(可能在它引发一定数量的异常之后)时添加主机名。
然后,当您要发布某些任务时,请检查它是否在违规列表中。就像是:
if socket.gethostname() in badHosts:
skip
else:
do_something()
如果您可以提供有关如何管理连接到的池的更多详细信息,我也许可以就如何直接删除它们提供更多建议,而不必每次都检查。
推荐阅读
- reactjs - Firebase push() 函数抛出 .child() 不是函数错误
- python - 如何防止在Django中的每个测试用例之后刷新测试数据库中的数据
- c# - 如何使用 System.CodeDom 获取条件语句?: 不是 if-else 块
- docker - 如何使用运行 pm2 的 nodejs 为 docker 设置 pm2-logrotate?
- amazon-web-services - 如何将现有堆栈转换为嵌套堆栈
- cmake - Poco 项目的 CMake 构建中缺少 libcrypto.lib
- python - discord.ext.commands.errors.CommandInvokeError:命令引发异常:AttributeError:'Command'对象没有属性'strftime'
- r - 在 R 中绘制互惠尺度
- r - Parse 无法评估文本
- java - 我的客户数据在添加到我的列表时被截断