首页 > 解决方案 > Celery/Flask 接收未注册类型的任务(App Factory + Blueprints)

问题描述

使用结合蓝图和 Flask Restful api 设置的 Flask App 工厂会导致 Celery 3.1.24 出现问题(我在 Windows 上)。

@celery.task装饰器放在 Flask-Restful 的资源类中会导致 Celery 在尝试运行 celery + redis 时无法检测到任何任务,因此它可以接受作业。

api_resource.py 示例

   class Api(Resource)

    def patch(self, received_key):
       ```do work here```


        #start celery and task
        """ @celery.task(name='api.internal.upvote_task')
        def the_task(username, package, key_type, url):

            ExampleClass.launch_tasks(buyer_username= username, package = package, key_type= key_type, url = url) """

        the_task.apply_async(args=[username, package, key_type, url], countdown=10)
        return used_key, 202

我什至尝试添加一个工人姓名@celery.task(name='api.internal.the_task')

我试着用

celery -A app worker -l info


celery worker -A app.celery --loglevel=info


celery -A app.api.internal worker --loglevel=DEBUG  

这是我试图使用任务模块的直接路径并试图让 celery 自己检测它们。

在我重构我的应用程序并切换到应用程序工厂格式之前,我无法让 celery 检测到我的任务。

标签: pythonflaskcelery

解决方案


我最终弄清楚如何解决问题的方法不是通过尝试 15 种不同的方法来强制 celery 通过绝对路径检测你的任务,将它放在 python 路径或任何东西中。

我实际上有点沮丧,因为经过几个小时的测试,我终于将我的 @celery.task 函数移到了 Resource 类之外,并从类中调用了它(在我的例子中是 api 端点)。

芹菜捡起它,我尝试了两种不同的方法,在我做出改变后都有效。第一个是设置一个 run.py/manage.py/celery_runnger.py,就像该教程中显示的那样。我早些时候尝试过,但它在课堂上不起作用。

我唯一的问题是我不想从那里推送应用程序上下文,也不想在该文件中导入 celery。

第二种方法是简单地在 celery 配置中使用包含 celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, include=['app.api.internal'])

这以前也没有工作。对某些人来说似乎很明显,但是将芹菜与所有这些不同的包装一起使用可能会很棘手,我希望这对某人有所帮助。

当任务从 celery 运行时,我仍然需要解决 Flask SQLA 不检测烧瓶应用程序上下文的问题,但这是一个不同的问题。


推荐阅读