首页 > 解决方案 > dask.compute() 中的重试尚不清楚

问题描述

从文档中,Number of allowed automatic retries if computing a result fails.

“结果”是指每个单独的任务还是整个 compute() 调用?

如果是指整个调用,如何实现dask.delayed中每个任务的重试?

另外,根据下面的代码,我不确定重试是否有效。

import dask
import random

@dask.delayed
def add(x, y):
    return x + y

@dask.delayed
def divide(sum_i):
    n = random.randint(0, 1)
    result = sum_i / n
    return result

tasks = []
for i in range(3):
    sum_i = add(i, i+1)
    divide_n = divide(sum_i)
    tasks.append(divide_n)

dask.compute(*tasks, retries=1000)

预期输出为 (1, 3, 5),实际为 ZeroDivisionError。

标签: daskdask-delayed

解决方案


如果有人感兴趣,我们可以使用 @retry 装饰器来处理任务,如下所示:

@dask.delayed
@retry(Exception, tries=3, delay=5)
def my_func():
    pass

重试装饰器:

from functools import wraps

def retry(exceptions, tries=4, delay=3, backoff=2, logger=None):
    """
    Retry calling the decorated function using an exponential backoff.

    Args:
        exceptions: The exception to check. may be a tuple of
            exceptions to check.
        tries: Number of times to try (not retry) before giving up.
        delay: Initial delay between retries in seconds.
        backoff: Backoff multiplier (e.g. value of 2 will double the delay
            each retry).
        logger: Logger to use.

    """
    if not logger:
        logger = logging.getLogger(__name__)

    def deco_retry(f):
        @wraps(f)
        def f_retry(*args, **kwargs):
            mtries, mdelay = tries, delay
            while mtries > 1:
                try:
                    return f(*args, **kwargs)
                except exceptions as e:
                    msg = f"{e}, \nRetrying in {mdelay} seconds..."
                    logger.warning(msg)
                    sleep(mdelay)
                    mtries -= 1
                    mdelay *= backoff
            return f(*args, **kwargs)
        return f_retry  # true decorator

    return deco_retry

推荐阅读