python - 如何获得完成 Celery 任务所需的估计时间?
问题描述
我正在尝试获取完成 Celery 任务所需的估计时间。例如,如果用户在前端触发了任务 A(可能通过单击触发对后端的 REST API 调用的按钮),它将显示一个计时器来显示该任务还需要多长时间才能完成结束。
如果我确实正确阅读了 Celery 的文档,我可以获得计划任务的 ETA。但是刚刚执行的当前任务呢?
我最初的想法是将先前发生在数据库中的类似任务所花费的执行时间存储在数据库中,并可能做一个平均值作为估计所需时间。如果有人可以在上面的例子中分享一些经验,如果他/她这样做了,那就太好了。非常感谢!
解决方案
方法一:使用定时装饰器
import time
import functools
def timer(func):
@functools.wraps(func)
def _wrapper(*args, **kwargs):
start = time.perf_counter()
val = func(*args, **kwargs)
end = time.perf_counter()
elapsed_time = end - start
task_info = {"time": elapsed_time, task_args: args, task_kwargs: kwargs}
# do something with this task_info, i.e log or save in db to be used for analysis later
return val
return _wrapper
@app.task(name="task1")
@timer
def task1(...):
...
方法二:使用芹菜信号
import time
from celery.signals import task_prerun, task_postrun
tasks = {}
@task_prerun.connect
def task_prerun_handler(task_id, task, **extras):
""" Dispatched before a task is executed. """
tasks[task_id] = time.perf_counter()
@task_postrun.connect
def task_postrun_handler(task_id, task, **extras):
""" Dispatched after a task has been executed. """
end = time.perf_counter()
elapsed_time = end - tasks.pop(task_id)
# do something with this task_info, i.e log or save in db to be used for analysis later
您可以查看的资源:
推荐阅读
- flutter - 使用 Velocity_x Flutter 通过 url 传递参数
- c - 为什么这段代码不起作用?我需要一个字符串和一个数字并向后打印字符串,并且对于每个字符我需要将它移动 n 次
- azure-functions - 如何获取类似于 Windows 任务调度程序的 Azure 函数计时器触发器的信息
- service - 在嵌套 js 微服务控制器中使用功能模块服务的问题
- ubuntu - Gtk-rs 错误:无法加载模块“appmenu-gtk-module”
- xml - 为什么我的模板不起作用?只有空行
- c# - 更改后面代码中的数据后如何刷新 XAML 窗口?
- firebase - 颤振发布请求创建错误无效的基数 10 数字(在字符 1)
- android - 在 Facebook Android SDK 中,如何使用 GraphRequest 获取 is_verified 标志?
- javascript - 如何在 django 中使用 Bootstrap Modal 发送更新的数据并保存它?