首页 > 解决方案 > 是否可以在任务本身的包装器中调用 celery 任务的延迟?

问题描述

我有以下配置:

class Foo:
    def __init__(self, services: List[str]):
        self.services = services

    def enqueuer(self, func):
        def wrapper(*args, **kwargs):
            if not args[1] in self.services:
                raise ForbiddenServiceError
            return func.delay(*args, **kwargs)

        return wrapper

    @enqueuer
    @celery_app.task()
    def foo_task(self, data, service: str):
        time.sleep(10)
        print(f"Received the service: {service} with data: {data}")

foo = Foo(["service1", "service2"])
foo.foo_task("Dummy data", "service1")

此配置是否会与调用func.delay(*args, **kwargs)?

因为,我得到的错误是TypeError: enqueuer() missing 1 required positional argument: 'func'

任何帮助深表感谢!

标签: pythoncelerypython-decoratorscelery-task

解决方案


推荐阅读