首页 > 解决方案 > 芹菜队列同时获取多条消息

问题描述

我正在开发一个基于 docker 的 Celery 支持的 Python 应用程序,其中一个功能是触发和发送给定数字的文本消息。工作流程如下:

  1. 用户上传包含一组条目的 CSV,文本消息应发送到这些条目
  2. cron 作业每 60 秒轮询一次数据库以获取任何新条目并将它们添加到队列中
  3. 如果找到新条目,将它们放入队列并触发短信

目前,如果我上传一个包含 3 个条目的 CSV 文件,每个短信操作都会按顺序触发,而不是并行触发(默认 celery 进程行为)。例如,如果调度程序每 10 秒从队列中获取一个作业,则发送 3 条文本消息所用的时间将是 30 秒。 由于这些作业彼此独立,因此我想将其并行化,以便同时发送所有三个文本消息。

我尝试增加队列的并发性,但假设每个线程将被分配三个消息之一,但它不起作用。恐怕我可能缺少一些东西。是否需要添加一些其他配置才能并行化作业?

运行芹菜队列的命令

celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair

芹菜配置


app = Celery(
    'worker',
    broker=os.environ['CELERY_BROKER'],
    backend=os.environ['RABBITMQ_BACKEND'],
    include=['worker.tasks','worker.schedule']
)



app.conf.update(
    result_expires=3600,
    task_track_started=True,
    worker_prefetch_multiplier = 5
)

app.conf.beat_schedule = {
    "get-message": {
        "task": "worker.schedule.get_new_messages",
        "schedule": 10,
        'options': {'queue' : 'queue1'}
    }
}

标签: pythondockerrabbitmqcelery

解决方案


你可以使用芹菜中的组

组用于并行执行任务。group 函数接受签名列表。

可能有助于参考,请参阅下面的文档

https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/


推荐阅读