amazon-web-services - Celery Redis 后端 - 使队列中的任务作为项目存在
问题描述
当前设置: celery 在 EC2 节点上的 docker 容器(使用我们的产品代码)上运行,创建和处理任务。我们的后端/代理是 Redis,在 AWS 的 elasticache 中运行。
目标:能够在任何给定时间查看队列大小(类似于花的监控),希望通过 AWS CloudWatch,但不是必需的。任务的内容不相关,因为我熟悉备份 redis 实例,并且可以使用本地工具解析备份以进行任何需要的分析。短期历史数据是非常受欢迎的(CloudWatch 可以追溯到 2 周,并且具有 1 分钟数据点的粒度,这非常好)。
根据我对 Flower 工作方式的了解,由于我们目前实施的安全组/限制数量众多,Flower 无法使用。此外,flower 仅在您在页面上时进行监控,因此不会保存任何历史数据。
Elasticache 已经在 CloudWatch 中为 redis 中的项目数量构建了。在我看来,这似乎是实现目标的最佳途径。但是目前队列代表redis中的一项(无论队列中有多少任务)。以下是解析为 json 的 redis 备份示例:
[{
"1.api_data__cached_api_route.000":"{\"i1\": 0, \"i2\": 1, \"i3\": 0}",
"1.api_data__cached_api_route.001":"{\"i1\": 0, \"i2\": 0, \"i3\": 0}",
"1.api_data__cached_api_route.002":"{\"i1\": 1, \"i2\": 1, \"i3\": 0}",
"staging_event_default":["{\"id\":\"b28b056c-1268-4577-af8a-9f1948860502\", \"task\":{...}}, "{\"id\":\"52668c46-3972-457a-be3a-6e27eedd26e3\", \"task\":{...}}]
}]
Cloudwatch 将其视为 4 个项目、3 个缓存的 api 路由和 1 个队列。即使队列有数千个项目,它仍会显示为 4 个项目。#(items in queue) 和 #(items in queue AND other cached items) 之间的差异很好,因为这个监控工具将主要用于查看队列是否得到可怕的备份,并且队列的大小将相形见绌缓存的 api 路由的数量。
要继续这条路线,最简单的答案是如果 celery 有一个配置选项可以使队列中的每个项目成为自己的 redis 项目。如果有一个简单的修复或配置选项,它似乎是最容易实现的。这是我们当前的 celery 配置选项:
flask_app.config.update(
CELERY_BROKER_URL=redis_host,
CELERY_RESULT_BACKEND=redis_host,
CELERY_QUEUES=queue_manager.queues,
CELERY_ROUTES=queue_manager.routes,
CELERY_DEFAULT_QUEUE=queue_manager.default_queue_name,
CELERY_DEFAULT_EXCHANGE=queue_manager.default_exchange_name)
_celery = celery.Celery(flask_app.import_name,
backend=flask_app.config['CELERY_RESULT_BACKEND'],
broker=flask_app.config['CELERY_BROKER_URL'])
opts = {
'broker_url': redis_host,
'result_backed': redis_host,
'task_queues': queue_manager.queues,
'task_routes': queue_manager.routes,
'task_default_queue': queue_manager.default_queue_name,
'task_default_exchange': queue_manager.default_exchange_name,
'worker_send_task_events': True,
'task_ignore_result': True,
'task_store_errors_even_if_ignored': True,
'result_expires': 3600,
'worker_redirect_stdouts': False,
'beat_scheduler': redbeat.RedBeatScheduler,
'beat_max_loop_interval': 5
}
_celery.conf.update(opts)
我遇到的另一个选项是celery-cloudwatch-logs,这似乎与我想要实现的目标一致,但似乎更旨在查看每个任务的特定内容,而不是汇总(但是我在那里可能是错的)。
如果没有满足目标的完美/简单的解决方案,我将研究 celery-cloudwatch 的分叉/构建以仅上传相关信息。我们的团队继承了目前存在的大部分代码,我对 celery 的工作原理有基本的了解,但并不深入。
提前感谢任何人的想法、评论和帮助!
解决方案
如果有人碰巧遇到它,我会在这里发布我所做的。
我们已经为应用程序其他地方的 S3 访问安装和配置了 boto3,因此很容易发布到 CloudWatch。
我在我们的类中添加了一个方法来使用来自 redis 模块Celery
的来检查队列的长度:llen
@staticmethod
def check_lengths():
result = {}
for q in Celery._queues:
result[q] = Celery._redis.llen(q)
return result
然后发布到 Cloudwatch 也相当容易:
namespace = "Celery/Queue"
metrics = []
for qname, qlen in data.items():
metric = {}
metric["MetricName"] = "ItemsInQueue"
metric["Dimensions"] = [ {"Name": "QueueName", "Value": qname} ]
metric["Value"] = qlen
metric["Unit"] = "Count"
metrics.append(metric)
self.cw_client.put_metric_data(Namespace=namespace, MetricData=metrics)
然后,我最终使用 AWS Lambda 即时向端点发送网络请求,然后将上述数据发布到 CloudWatch。
推荐阅读
- vue.js - 如何在 vuejs 和 vue-router 中将 1 条应用数据记录传递给路由
- javascript - 如何在不通过循环的情况下将一个对象与 JavaScript 中的其他对象内部值合并?
- javascript - Hyperledger 作曲家在 for 循环中创建多个资产
- r - stringr::str_wrap 不完全换行每 n 个字符
- razor - 从 ASP.NET MVC 5 的 Razor 循环中生成的复选框获取值的正确方法是什么?
- jquery - contextmenu() 与数据表?
- amazon-web-services - 用于查询 S3 存储桶的 AWS CLI 命令
- python - 使用 CSV/Excel/SPSS 文件使用 pandas 创建表/报告
- html - 带有点击显示功能的 HTML 电子邮件?
- java - 在 Java 中包装长泛型类型的约定是什么?