python - 如何在运行时将队列添加到自定义消息使用者
问题描述
我有一个用例,我的 Django 应用程序需要处理来自外部源的消息。Celery 通过Custom Message Consumers支持这一点,这部分工作正常。
交换类型headers
使得消息可以根据条件路由到不同的队列。条件是通过 Django Admin 配置的。从下面的代码中,您可以看到,例如 QueueItem 可以随时激活/停用。
这是第一次加载工人时发现的。在新的 Rabbit 上,交换、队列和路由都已构建,并且自定义消费者正确地使用消息。
问题出在用户添加新 QueueItem 或翻转现有 QueueItem 的活动状态时。我可以动态创建新队列,但无论我多么努力地尝试自定义消费者都不会从中消耗。我希望该应用程序:
- 重新启动自定义消费者(我试过
app.control.broadcast('pool_restart')
) - 添加队列并使用自定义消费者使用它。(我试过
app.control.add_consumer(...)
) - 优雅地重新启动所有工作人员,当我手动工作以获取新设置时。
class MyConsumerStep(bootsteps.ConsumerStep):
alias = 'MyConsumerStep'
def get_queues(self):
from app.models import QueueItem
qs = QueueItem.objects.filter(active=True, mode=QueueItem.MODE_ASYNC, direction=QueueItem.DIRECTION_INBOUND)
return [Queue(q.name, Exchange('qbpubsub', 'headers'),
binding_arguments={
'var1': q.field1,
'var2': q.field2,
'x-match': 'any'
}) for q in qs]
def get_consumers(self, channel):
return [Consumer(channel,
queues=self.service_queues,
on_message=self.on_message,
accept=['json'])]
def on_message(self, message):
payload = message.decode()
print('Received message: {0!r} {props!r} rawlen={s}'.format(
payload, props=message.properties, s=len(message.body)
))
message.ack()
我已经成功地创建了一个新队列。我已经成功地将它附加到默认的 celery 消费者。我想以某种方式将它附加到自定义消费者,以便on_message
回调可以处理它。
我觉得我已经很接近了,但只是缺少了 Celery / Kombu API 的一小部分。
解决方案
推荐阅读
- android - Windows 错误:spawnSync ./gradlew 在模拟器上运行反应本机项目时出现 EACCES 错误
- javascript - setInterval() 没有删除之前绘制的图像
- javascript - 在 React 中使用 UseState 处理事件
- php - PHP将变量添加到json数组
- heroku - Heroku:pg_restore:[存档]文件头中不支持的版本(1.14)
- javascript - 位置无效:纬度必须是数字
- xamarin - Xamarin 形成动态通知
- php - 在 3 li 元素内循环 6 个帖子
- node.js - 如何让客户端从受密码保护的页面下载文件?
- python - Python PermissionError: [WinError 32] 进程无法访问文件.....但是我的文件已关闭