首页 > 解决方案 > Celery Periodic 任务没有结果和元数据

问题描述

我会想,如果有人会帮助解决我的问题。我使用 (Flask,Celery,RabbitMQ) 并且我想从 Celery 任务中获取结果。如果我作为芹菜工人(手动)而不是定期运行任务。AsyncResults 从该任务返回结果。但是,如果我将其作为定期任务运行。AsyncResult 返回 celery 对象,但结果和元数据为空。但我在 rabbitmq 的队列列表中看到了所有结果。你能帮我解决这个问题吗?

我使用这个简单的芹菜设置

app.py 

celery.conf.enable_utc = False
celery.conf.result_backend = 'rpc://'
celery.conf.broker = 'pyamqp://guest@localhost//'

celery_app.conf.beat_schedule = {
    "Calc_Month_Data": {
        "task": "orionapp.celerytask.calc_month_data",
        "schedule": crontab(hour=1,minute=45)
    }}

celerytask.py

@c.task
def calc_month_data():
    if sql.check_celery_task('Months'):
        count = 0
        for i,x in enumerate(api.months_calc_data()):
            if i == 0:
                count = x
                x = 0
            calc_month_data.update_state(state='PROGRESS', meta={"progress": x, "count":count})```

Stream.py

def get_active_task_id(task_name: str) -> str:
    try:
        task_id = None
        ins = c.control.inspect()
        active_task = ins.active()
        if active_task:
            for key,value in active_task.items():
                for x in value:
                    if x["name"] == task_name:
                        task_id = x["id"]
        else:
            log.add('get_active_task_id',f'Nepodarilo sa získať ID celery úlohy: {task_name}, inspect: {ins}, active task: {active_task}. Chyba: Niesu aktívne žiadne úlohy','warning')
        return task_id
    except Exception as err:
        log.add('get_active_task_id',f'Chyba pri získaní ID celery úlohy: {task_name}, inspect: {ins}, active task: {active_task}. Chyba: {err}','warning')
        return None

@celery_stream.route('/celery/months')
@User.check_auth([8])
def stream_months():
    def get_progress():
        data = {"progress":0,"count":0,"status":None}
        auto_task_id = get_active_task_id("orionapp.celerytask.calc_month_data")
        if not auto_task_id:
            data["status"] = "NoActive"
        else:
            task_id = auto_task_id
            task = c.AsyncResult(id=task_id)

#### If it is periodic task, so "task" has not attribute info or result...

            try:
                data["progress"] = task.info["progress"]
                data["count"] = task.info["count"]
                if int(task.info["progress"]) >= int(task.info["count"]):
                    data["status"] = 'Finished'
                else:
                    data["status"] = 'Pending'
            except TypeError:
                data["status"] = 'Pending'
        yield f'data: {json.dumps(data)}\n\n'
    return Response(stream_with_context(get_progress()),mimetype="text/event-stream")

标签: pythoncelerycelery-task

解决方案


推荐阅读