python - 如何检查处理 Celery 任务的队列
问题描述
我目前正在利用芹菜进行定期任务。我是芹菜新手。我有两个工人运行两个不同的队列。一种用于慢速后台作业,另一种用于用户在应用程序中排队的作业。
我在 datadog 上监控我的任务,因为这是确认我的工作人员正常运行的简单方法。
我想要做的是在每个任务完成后,记录任务在哪个队列上完成。
@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
statsd.increment("celery.on_task_publish.start.increment")
task = celery.tasks.get(sender)
queue_name = task.queue
statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])
以下功能是我在研究 celery 文档和一些 StackOverflow 帖子后实现的,但它没有按预期工作。我得到了第一个 statsd 增量,但剩余的代码没有执行。
我想知道是否有一种更简单的方法来检查内部/每个任务完成后,哪个队列处理了任务。
解决方案
由于您的问题说是否有办法在每个任务完成后/内部进行检查- 我假设您还没有尝试过这种 celery-result-backend 的东西。因此,您可以查看 Celery 本身提供的此功能:Celery-Result-Backend / Task-result-Backend
。它对于存储 celery 任务的结果非常有用。通读此 => https://docs.celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings
一旦您了解如何设置此结果后端,请搜索result_extended
键(在同一链接中)以便能够添加queue-names
您的任务返回值。
可用的选项数量 - 就像您可以设置这些结果以转到其中任何一个:
Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc
我已经使用了这个Result-Backend
功能,Elasticsearch
并且我的任务结果是这样存储的:
只需settings.py
根据您的要求在文件中添加一些配置即可。非常适合我的应用程序。而且我有一个每周 cron 只清除successful results
任务 - 因为我们不再需要结果 - 我只能看到failed results
(如图像中的那个)。
这些是我要求的主要关键:task_track_started
以及task_acks_late
result_backend
推荐阅读
- hive - 如何快速获取 150 多个表的最近十天分区计数
- python - 拆分已经在列表中的字符串
- javascript - 为什么我收不到邮件?
- javascript - 使用 expressJS 通过电子邮件确认注册
- docker - 如何在 docker-compose for Windows server 2016 上修复“HNS 失败并出现错误:未指定错误”
- nats.io - NATS Streaming Server 在使用集群时被关闭
- javascript - BindSelect2 不适用于选择框
- loops - 如何解析 CF 结构
- python - Python代码问题-尝试创建一个脚本来定位文件,对其进行编辑并将其另存为新文件
- ruby-on-rails - 如何从 Rails 5 API only 应用程序提供本地图像 url