首页 > 解决方案 > 芹菜和烧瓶,同一个芹菜应用实例

问题描述

我正在尝试通过几个烧瓶端点从芹菜节拍中动态添加和删除任务。我创建了一个名为 myApp 的简单项目和一个名为 flaskr 的包(是的,就像教程一样),其中包含三个文件

myApp
    flaskr
        __init__.py
        routes.py
        tasks.py
    wsgi.py

这是端点代码

@route_blueprint.route('/myApp/add_task')
def add():
    print(celery.conf.beat_schedule)
    print(hex(id(celery)))
    celery.add_periodic_task(10.0, tasks.add.s(55, 2), name='add every 10')
    print(celery.conf.beat_schedule)
    return ""

我转到 PyCharm 控制台,然后从其中一个控制台运行 gunicorn,如下所示:

gunicorn wsgi:app -b localhost:8000

从另一个控制台选项卡我也像这样运行 Celery

celery -A flaskr.celery worker --loglevel=info

And from another I run beat like this

celery -A flaskr.celery beat -l=debug

当我点击端点时,在控制台中我可以看到正在添加的任务,但 beat 从不发送它。

我怀疑烧瓶设置任务是一个不同的 celery_app 实例,所以我打印了一个我试图修改的 celery 对象,是的,它是一个不同的对象。

这是从芹菜开始

烧瓶器:0x110048978

 -------------- celery@MacBook-Pro.local v4.3.0 (rhubarb)
---- **** ----- 
--- * ***  * -- Darwin-18.6.0-x86_64-i386-64bit 2019-08-26 17:19:47
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         flaskr:0x110048978
- ** ---------- .> transport:   redis://localhost:6379/2
- ** ---------- .> results:     redis://localhost:6379/2
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

这是来自端点

0x101e31e80

问题

我对python很陌生,但我想这是有道理的,因为我从两个不同的进程触发相同的代码,一个来自芹菜工人,另一个来自烧瓶/gunicorn,所以他们永远不会看到对方。

有没有办法让烧瓶访问从 celery 命令行实例初始化的实例,或者我应该从烧瓶内部启动工作人员?(我在 celery 和 flask 的任何文档中都没有看到)


这是完整的代码

__init__.py

from flask import Flask
from celery import Celery
import config

celery = Celery(__name__,
                backend=config.CELERY_BACKEND,
                broker=config.CELERY_BROKER,
                include=['flaskr.tasks'])


@celery.task
def asd(x, y):
    print('ADD')
    # raise exceptions.Retry(20)
    return x + y


def create_app(test_config=None):
    # create and configure the app
    app = Flask(__name__)

    from .routes import route_blueprint
    app.register_blueprint(route_blueprint)

    return app

任务.py

from __future__ import absolute_import, unicode_literals
from . import celery
import logging.config


logging.config.fileConfig('logging.conf')
logger = logging.getLogger('myApp')


@celery.task
def add(x, y):
    print('ADD')
    # raise exceptions.Retry(20)
    return x + y


@celery.task(bind=True)
def see_you(self, x, y):
    logger.info('Log de see_you')
    print(x)
    # print("See you in ten seconds!")


print('Initializing from tasks')
print(hex(id(celery)))
print('beat schedule: ' + str(celery.conf.beat_schedule))
# celery.add_periodic_task(10.0, add.s(1, 2), name='add every 10')
# print(str(celery.conf.beat_schedule))

路线.py

from flask import Blueprint
import logging.config
from . import tasks
from . import celery


route_blueprint = Blueprint('route_blueprint', __name__,)

logging.config.fileConfig('logging.conf')
logger = logging.getLogger('myApp')


@route_blueprint.route('/myApp/health')
def health():
    return "Health ok"


@route_blueprint.route('/myApp/add_task')
def add():
    print(celery.conf.beat_schedule)
    # tasks.add.delay(55, 2)
    print(hex(id(celery)))
    celery.add_periodic_task(10.0, tasks.add.s(55, 2), name='add every 10')
    print(celery.conf.beat_schedule)
    return "okkk"

标签: pythonflaskcelerycelerybeat

解决方案


推荐阅读