python - Celery/Flask 接收未注册类型的任务(App Factory + Blueprints)
问题描述
使用结合蓝图和 Flask Restful api 设置的 Flask App 工厂会导致 Celery 3.1.24 出现问题(我在 Windows 上)。
将@celery.task
装饰器放在 Flask-Restful 的资源类中会导致 Celery 在尝试运行 celery + redis 时无法检测到任何任务,因此它可以接受作业。
api_resource.py 示例
class Api(Resource)
def patch(self, received_key):
```do work here```
#start celery and task
""" @celery.task(name='api.internal.upvote_task')
def the_task(username, package, key_type, url):
ExampleClass.launch_tasks(buyer_username= username, package = package, key_type= key_type, url = url) """
the_task.apply_async(args=[username, package, key_type, url], countdown=10)
return used_key, 202
我什至尝试添加一个工人姓名@celery.task(name='api.internal.the_task')
我试着用
celery -A app worker -l info
celery worker -A app.celery --loglevel=info
celery -A app.api.internal worker --loglevel=DEBUG
这是我试图使用任务模块的直接路径并试图让 celery 自己检测它们。
在我重构我的应用程序并切换到应用程序工厂格式之前,我无法让 celery 检测到我的任务。
解决方案
我最终弄清楚如何解决问题的方法不是通过尝试 15 种不同的方法来强制 celery 通过绝对路径检测你的任务,将它放在 python 路径或任何东西中。
我实际上有点沮丧,因为经过几个小时的测试,我终于将我的 @celery.task 函数移到了 Resource 类之外,并从类中调用了它(在我的例子中是 api 端点)。
芹菜捡起它,我尝试了两种不同的方法,在我做出改变后都有效。第一个是设置一个 run.py/manage.py/celery_runnger.py,就像该教程中显示的那样。我早些时候尝试过,但它在课堂上不起作用。
我唯一的问题是我不想从那里推送应用程序上下文,也不想在该文件中导入 celery。
第二种方法是简单地在 celery 配置中使用包含
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, include=['app.api.internal'])
这以前也没有工作。对某些人来说似乎很明显,但是将芹菜与所有这些不同的包装一起使用可能会很棘手,我希望这对某人有所帮助。
当任务从 celery 运行时,我仍然需要解决 Flask SQLA 不检测烧瓶应用程序上下文的问题,但这是一个不同的问题。