python - 如何在装饰器下运行 celery-beat 任务?
问题描述
我有“储物柜”装饰器:
def lock_task(func):
def wrapper(*args, **kwargs):
if redis.set(func.__name__, 'lock', nx=True):
try:
result = func(*args, **kwargs)
finally:
redis.delete(func.__name__)
return result or True
else:
return 'Skipped'
return wrapper
我的装饰师也有芹菜任务:
@celery_app.task
@lock_task
def test():
call_command('test')
我有我的芹菜节拍设置:
celery_app.conf.beat_schedule = {
'test': {
'task': 'project.celery.test',
'schedule': crontab(minute='*/1')
}
}
开始后,我收到 KeyError Received 类型为“project.celery.test”的未注册任务。
如何称这个构造正确?
解决方案
似乎该wrapper
功能将是注册到 celery 而不是实际功能的test
功能。如果您在启动 celery worker 时看到此日志,则可以验证它:
$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
. project.celery.wrapper
要注册实际任务的名称,请使用文档中的functools.wraps():
如果不使用此装饰器工厂,示例函数的名称将是“包装器”
from functools import wraps
def lock_task(func):
@wraps(func)
def wrapper(*args, **kwargs):
...
如果错误仍然存在,请确保您已正确配置:
- 芹菜进口例如
celery_app.conf.update(imports=['project.celery'])
或celery_app.conf.imports = ['project.celery']
- 或芹菜包括(示例)例如
celery_app = Celery(..., include=['project.celery'])
project.celery.test
为了验证,您应该在启动 celery worker 时看到您的任务命名(强调的是worker,而不是scheduler):
$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
. project.celery.test
- 见最后一行。如果您使用 flag 调用 worker,它应该是可见的
--loglevel=INFO
。如果您没有test
在其中看到任务或看到wrapper
相反,那么上述步骤可能会有所帮助。
推荐阅读
- excel - 通过宏添加 excel IF 语句。编译时出错
- excel - 拆分字符串,但在所有拆分中考虑 1 个分隔符
- c# - 如果 C# 客户端未关闭,Python 服务器无法重新启动
- android - Android 数据计数器差异(应用总和 < 总数)
- java - 如何在 Java 中为 CompletableFuture 执行资源清理?
- scala - 为什么在 Spark 数据集中 null 转换为 -1.0
- python - ImportError:无法导入名称'password_reset'
- python - IPython 7.0.1 中的多行编辑中断
- php - graphql-php 谁通过 id 获取结果
- python - 在mac中更改tesseract的目录?