首页 > 解决方案 > 如何忽略任务失败的工作人员并将其任务重新分配给其他工作人员?

问题描述

我正在使用 client.map 在 N 个单线程工作人员(在 N 台机器上)的池上运行一个函数,其中一个工作人员失败了。我想知道是否有一种方法可以自动处理工作人员提出的异常,将其失败的任务重新分配给其他工作人员,并将其从池中忽略或排除?

我试过用下面显示的方法模拟这个问题。为了导致一名工作人员失败,我在其中提出了一个 OSError ,my_function提交给client.map如下:futures = client.map(my_function, range(100))在我的示例中,“Computer123”上的工作人员将失败。为了处理由 引发的异常my_function,我使用 sys.exit 在exception_handler. 因此,当工作人员的任务失败时,会调用 sys.exit。结果是坏的worker的distributed.nanny捕获了故障并重新启动了worker,而客户端重新分配了它失败的任务。但是一旦坏工人再次备份,它会再次接收任务,因为它仍在池中。它再次失败并重复该过程。随着它继续失败,最终其他工人完成了所有任务。如果我可以自动处理来自“Computer123”等不良工作人员的异常并将其从池中删除,那将是理想的。也许我需要做的就是将其从池中移除?

@exception_handler
def my_function(x):
  import socket 
  import time
  time.sleep(5)
  if socket.gethostname() == 'Computer123':
    raise(OSError)
  else:
    return x**2

def exception_handler(orig_func):
  def wrapper(*args,**kwargs):
    try:
      return orig_func(*args,**kwargs)
    except:
      import sys
      sys.exit(1)
  return wrapper

标签: pythondistributed-computingdaskdask-distributed

解决方案


作为一种解决方法,您可以保留一个坏工作人员的字典,每次确定它是坏的(可能在它引发一定数量的异常之后)时添加主机名。

然后,当您要发布某些任务时,请检查它是否在违规列表中。就像是:

  if socket.gethostname() in badHosts:
    skip
  else:
    do_something()

如果您可以提供有关如何管理连接到的池的更多详细信息,我也许可以就如何直接删除它们提供更多建议,而不必每次都检查。


推荐阅读