首页 > 解决方案 > 如何让 Celery 工作人员使用“外部”RabbitMQ 队列?

问题描述

我有以下脚本:

celery_tasks.py

from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'

@app.task(acks_late=True)
def test(a):
   return a

发布.py

from celery_tasks import test
test.delay('abc')

当我运行 publish.py 并启动工作程序(celery -A celery_tasks worker --loglevel=DEBUG)时,“abc”内容发布在“test_queue”中并被工作程序使用。

有没有办法让工作人员从 Celery 未发布的队列中消费一些东西?例如,当我直接通过 RabbitMQ 将某些内容放入 test_queue 中,而不通过 Celery 发布者,并运行 Celery 工作程序时,它给了我以下警告:

WARNING/MainProcess] 收到并删除未知消息。目的地错误?!?

邮件正文的完整内容是:body: 'abc' (3b)

{content_type:None content_encoding:None delivery_info:{'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue'} headers={}}

有没有办法解决这个问题?

标签: pythonrabbitmqcelery

解决方案


Celery 有一个特定的格式和一组需要维护以符合它的标题。因此,您必须对其进行逆向工程以使芹菜兼容的消息不是由芹菜产生的。请记住,celery 并不是真正用于通过代理发送消息,而是发送任务,这是增强的消息,因此在 amqp 消息的标头部分有额外内容


推荐阅读