python - 芹菜队列同时获取多条消息
问题描述
我正在开发一个基于 docker 的 Celery 支持的 Python 应用程序,其中一个功能是触发和发送给定数字的文本消息。工作流程如下:
- 用户上传包含一组条目的 CSV,文本消息应发送到这些条目
- cron 作业每 60 秒轮询一次数据库以获取任何新条目并将它们添加到队列中
- 如果找到新条目,将它们放入队列并触发短信
目前,如果我上传一个包含 3 个条目的 CSV 文件,每个短信操作都会按顺序触发,而不是并行触发(默认 celery 进程行为)。例如,如果调度程序每 10 秒从队列中获取一个作业,则发送 3 条文本消息所用的时间将是 30 秒。 由于这些作业彼此独立,因此我想将其并行化,以便同时发送所有三个文本消息。
我尝试增加队列的并发性,但假设每个线程将被分配三个消息之一,但它不起作用。恐怕我可能缺少一些东西。是否需要添加一些其他配置才能并行化作业?
运行芹菜队列的命令
celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair
芹菜配置
app = Celery(
'worker',
broker=os.environ['CELERY_BROKER'],
backend=os.environ['RABBITMQ_BACKEND'],
include=['worker.tasks','worker.schedule']
)
app.conf.update(
result_expires=3600,
task_track_started=True,
worker_prefetch_multiplier = 5
)
app.conf.beat_schedule = {
"get-message": {
"task": "worker.schedule.get_new_messages",
"schedule": 10,
'options': {'queue' : 'queue1'}
}
}
解决方案
你可以使用芹菜中的组
组用于并行执行任务。group 函数接受签名列表。
可能有助于参考,请参阅下面的文档
https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/
推荐阅读
- python-3.x - 如何使用 tkinter.Label 更改文本颜色
- git - 我们可以在不克隆 repo 的情况下获得最后的提交消息吗?
- python - 使用 Python 请求登录 Intuit Qucikbooks
- java - 如何在Android中制作编码字符串?
- java - 未找到 com.google.common.util.concurrent.ListenableFuture 的 Android Studio 类文件
- python - 如何计算给定值数组的百分位数?
- python - 在 Django 中安装 Pillow 时出错
- javascript - Javascript 模板文字和 Html 文本区域
- python - 请解释这个python代码的输出?
- laravel - Laravel 验证值范围