首页 > 解决方案 > 如何在装饰器下运行 celery-beat 任务?

问题描述

我有“储物柜”装饰器:

def lock_task(func):
    def wrapper(*args, **kwargs):
        if redis.set(func.__name__, 'lock', nx=True):
            try:
                result = func(*args, **kwargs)
            finally:
                redis.delete(func.__name__)
            return result or True
        else:
            return 'Skipped'
    return wrapper

我的装饰师也有芹菜任务:

@celery_app.task
@lock_task
def test():
    call_command('test')

我有我的芹菜节拍设置:

celery_app.conf.beat_schedule = {
    'test': {
        'task': 'project.celery.test',
        'schedule': crontab(minute='*/1')
    }
}

开始后,我收到 KeyError Received 类型为“project.celery.test”的未注册任务。

如何称这个构造正确?

标签: pythondjangorediscelery

解决方案


似乎该wrapper功能将是注册到 celery 而不是实际功能的test功能。如果您在启动 celery worker 时看到此日志,则可以验证它:

$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
  . project.celery.wrapper

要注册实际任务的名称,请使用文档中的functools.wraps()

如果不使用此装饰器工厂,示例函数的名称将是“包装器”

from functools import wraps

def lock_task(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
    ...

如果错误仍然存​​在,请确保您已正确配置:

  • 芹菜进口例如celery_app.conf.update(imports=['project.celery'])celery_app.conf.imports = ['project.celery']
  • 或芹菜包括示例)例如celery_app = Celery(..., include=['project.celery'])

project.celery.test为了验证,您应该在启动 celery worker 时看到您的任务命名(强调的是worker,而不是scheduler):

$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
  . project.celery.test
  • 见最后一行。如果您使用 flag 调用 worker,它应该是可见的--loglevel=INFO。如果您没有test在其中看到任务或看到wrapper相反,那么上述步骤可能会有所帮助。

推荐阅读