首页 > 解决方案 > 如何获得完成 Celery 任务所需的估计时间?

问题描述

我正在尝试获取完成 Celery 任务所需的估计时间。例如,如果用户在前端触发了任务 A(可能通过单击触发对后端的 REST API 调用的按钮),它将显示一个计时器来显示该任务还需要多长时间才能完成结束。

如果我确实正确阅读了 Celery 的文档,我可以获得计划任务的 ETA。但是刚刚执行的当前任务呢?

我最初的想法是将先前发生在数据库中的类似任务所花费的执行时间存储在数据库中,并可能做一个平均值作为估计所需时间。如果有人可以在上面的例子中分享一些经验,如果他/她这样做了,那就太好了。非常感谢!

标签: pythonrabbitmqceleryfastapi

解决方案


方法一:使用定时装饰器

import time
import functools


def timer(func):
    @functools.wraps(func)
    def _wrapper(*args, **kwargs):
        start = time.perf_counter()
        val = func(*args, **kwargs)
        end = time.perf_counter()
        elapsed_time = end - start
        task_info = {"time": elapsed_time, task_args: args, task_kwargs: kwargs}
        # do something with this task_info, i.e log or save in db to be used for analysis later
        return val

    return _wrapper


@app.task(name="task1")
@timer
def task1(...):
  ...

方法二:使用芹菜信号

import time
from celery.signals import task_prerun, task_postrun

tasks = {}


@task_prerun.connect
def task_prerun_handler(task_id, task, **extras):
    """ Dispatched before a task is executed. """
    tasks[task_id] = time.perf_counter()


@task_postrun.connect
def task_postrun_handler(task_id, task, **extras):
    """ Dispatched after a task has been executed. """
    end = time.perf_counter()
    elapsed_time = end - tasks.pop(task_id)
    # do something with this task_info, i.e log or save in db to be used for analysis later

您可以查看的资源:


推荐阅读