首页 > 解决方案 > 将参数传递给自定义 celery 任务

问题描述

我只是想知道将参数传递给自定义任务装饰器的正确方法是什么。例如,我发现我可以将 celery 任务子类化如下:

class MyTask(celery.Task):
    def __init__(self):
        # some custom stuff here
        super(MyTask, self).__init__()

    def __call__(self, *args, **kwargs):
        # do some custom stuff here
        res = self.run(*args, **kwargs)
        # do some more stuff here
        return res

并使用它如下:

@MyTask
def add(x, y):
    return x + y

但我希望能够将一个参数传递给这个任务,并让它根据参数(或等效地,根据它正在装饰的函数)表现不同。我可以想到两种方法来做到这一点。一种是通过将额外的 kwarg 传递给 celery 的任务包装器并明确指定基数,例如

@celery.task(base=MyTask, foo="bar")
def add(x, y):
    return x + y

我可以在我的自定义任务中访问它self.foo,但这对我来说有点像作弊。另一种方法是检查self.task,并根据其值更改行为,但这似乎也有点矫枉过正。理想情况下,我想将 kwarg 直接传递给自定义任务类,

@MyTask(foo="bar")
def add(x, y):
    return x + y

但是当然这会创建一个 的实例MyTask,我们不希望它也不会工作。

关于这样做的正确方法有什么建议吗?

标签: pythonargumentstaskcelerysubclass

解决方案


您可以添加使用类成员而不是实例成员。__init__因此,您将在 in之外定义您的参数MyTask,如下所示。然后,您可以将此类用作 celery 任务的基类,并将这些新的类成员用作自定义任务的参数。

注意:不幸的是,您不能将它们传递给__init__您,因为您需要MyTask在装饰时实例化。

class MyTask(celery.Task):
    foo = "default"

    def __init__(self):
        # some custom stuff here
        super(MyTask, self).__init__()

    def __call__(self, *args, **kwargs):
        # do some custom stuff here
        print(self.foo)
        res = self.run(*args, **kwargs)
        # do some more stuff here
        return res

然后你可以使用:

@celery.task(base=MyTask, foo="bar")
def add(x, y):
    return x + y

推荐阅读