python - 如何让 Celery 工作人员使用“外部”RabbitMQ 队列?
问题描述
我有以下脚本:
celery_tasks.py
from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'
@app.task(acks_late=True)
def test(a):
return a
发布.py
from celery_tasks import test
test.delay('abc')
当我运行 publish.py 并启动工作程序(celery -A celery_tasks worker --loglevel=DEBUG)时,“abc”内容发布在“test_queue”中并被工作程序使用。
有没有办法让工作人员从 Celery 未发布的队列中消费一些东西?例如,当我直接通过 RabbitMQ 将某些内容放入 test_queue 中,而不通过 Celery 发布者,并运行 Celery 工作程序时,它给了我以下警告:
WARNING/MainProcess] 收到并删除未知消息。目的地错误?!?
邮件正文的完整内容是:body: 'abc' (3b)
{content_type:None content_encoding:None delivery_info:{'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue'} headers={}}
有没有办法解决这个问题?
解决方案
Celery 有一个特定的格式和一组需要维护以符合它的标题。因此,您必须对其进行逆向工程以使芹菜兼容的消息不是由芹菜产生的。请记住,celery 并不是真正用于通过代理发送消息,而是发送任务,这是增强的消息,因此在 amqp 消息的标头部分有额外内容
推荐阅读
- makefile - Makefile 处理文件夹的内容
- ios - Firebase 静默通知丢失 (iOS)
- java - 利用 Java 中的数组和方法的老虎机项目
- python - 将字符串数字列转换为 Pandas 中的浮点数
- android - 通过android每天设置闹钟8时钟不起作用
- react-beautiful-dnd - 使用 react-Beautiful-dnd,如何在拖动开始之前更改可拖动尺寸(onBeforeCapture)
- vim - 从语法集群中删除 @Spell
- python - ADF 使用 python 删除活动 - 无法创建日志记录
- postgresql - 如何连接到在 docker 中运行的 postgres 实例?
- vuejs2 - 如何在 Vuejs 中有条件地显示数据