首页 > 解决方案 > 带有额外参数的 Celery 自定义任务

问题描述

我目前正在从事一个有许多芹菜任务的项目,每个项目都有复杂的重试规则。

我们配置max_retriesretry_backoffice使用环境变量,如果max_retries超出,我们将消息放在“后备队列”中,如example_task.apply_async(queue=settings.EXAMPLE_FALLBACK_QUEUE, kwargs=kwargs). 问题是我们正在复制大量代码并使用以下代码混淆我们的任务:

@app.task
def example_task(**kwargs):
    try:
         ''' The real task business '''
         ...
    except Exception as e:
         # Here we have some logs
         # Also some checks about the nature of e and how to proceed

         # some cases we run:
         example_task.retry(
            exc=e, max_retries=max_retries, countdown=countdown
         )

         # and other cases we have
         example_task.apply_async(queue=settings.example_task_fallback_queue, kwargs=kwargs)

我的想法是用一些额外的参数扩展celery.Task类并做这样的事情:

@app.task(@app.task(ignore_result=False, bind=True, base=RetryTask(place_an_order_config))
def example_task(kwargs):
    '''Just the real business here'''

并实现一个类,如:

@dataclass
class RetryConfig:
    fallback_queue: str
    max_retries: int
    retry_backoff: int
    event_name: str
    traced_exceptions: List[Any]


class RetryTask(celery.Task):
    def __init__(self, config: RetryConfig):
        super().__init__()
        self.config = config

    def __call__(self, *args, **kwargs):
        try:
            self.run(*args, **kwargs)
        except Exception as e:
            if e in self.config.traced_exceptions:
                # Logs
                return

            # And here implement the retry/fallback logic
            # Using the parameterization attr `self.config` to
            # setup

问题是 arg baseinapp.task需要一个类(并且 celery 在内部实例化该类)而不是一个对象,所以我将RetryTask类更改为如下所示:

class RetryTask:  # Dont inhirit directly from celery.Task
    def __init__(self, config: RetryConfig):
        self = celery.Task()
        self.config = config

    def __call__(self, *args, **kwargs):
        try:
            self.run(*args, **kwargs)
        except Exception as e:
            if e in self.config.traced_exceptions:
                # Logs
                return

            # And here implement the retry/fallback logic
            # Using the parameterization attr `self.config` to
            # setup

这样我就可以将RetryTask对象传递给app.task装饰器并参数化我的任务。不幸的是,上面的代码TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its basesapp.task函数检查时抛出task = type(fun.__name__, (base,), dict(...)

所以我的问题是:如何以正确的方式参数化我的自定义RetryTask类?

标签: pythonpython-3.xcelery

解决方案


推荐阅读