首页 > 解决方案 > 如何在 celery 中为每个用户生成队列?

问题描述

所以我试图将网络请求中的阻塞内容作为后台任务和利用队列移动。我也是消息传递和发布/订阅的新手。用户在那里推送数据并对其进行处理,然后通知用户。我为此做了一个 celery 设置,发现它不能满足我的用例,因为每个用户都有自己的任务专用队列。

我尝试指定丢失队列的创建和工作人员生成期间(发送以逗号分隔的队列名称),并将它们列在队列设置中,如先前通过互联网回答“使用芹菜创建动态队列”中所述。它创建队列,但当我指定与设置和命令行中指定名称不同的队列名称时,它不会。解决方案是使用不满足用例的队列名称生成更多工作人员,因为将有数百万个数据处理请求。

我发现 python-rq 具有 Queue 对象初始化及其名称,我认为它创建了新队列。如果是这样,转向 RQ 是否正确?

redis_conn = Redis()
q = Queue('some_queue', connection=redis_conn)

我想要的是每个用户在后台为他们自己的任务排队。我在 celery 中没有看到任何在线创建动态队列的解决方案(没有在命令行中指定队列名称或设置)。python-rq 似乎有这个解决方案。我的权衡是从 RabbitMQ 和 celery 迁移到 redis。

有没有办法在芹菜中真正做到每个用户队列?如果是,请列出步骤。还是这种设计模式不正确?pubsub 会满足用例吗?

标签: djangoredisrabbitmqcelery

解决方案


Celery worker 仅使用通过task_queues设置定义的队列或在命令行中使用-Q选项给出的队列。但是,这可以从命令行或代码动态更改。只需确保task_create_missing_queues在设置中启用(这是默认设置),以便在您开始使用它们时自动创建新队列。

因此,在为给定用户发送任务之前,您必须指示工作人员开始从用户队列中消费。这可以通过使用add_consumer控制命令从命令行实现,也可以通过使用app.control.add_consumer()方法的代码实现。这些操作是幂等的,因此如果工作人员已经从队列中消费,则不会发生任何事情。如果队列尚不存在,则会自动创建。


推荐阅读