pika - Pika/RabbitMQ 如何从单个连接创建新通道?
问题描述
我有一个使用 python pica 的方法。这里消费者需要为每个队列创建一个通道。编写此代码时出现错误。
def Process(channel, exc, que):
channel.exchange_declare(exchange=exc, exchange_type='direct', durable=True)
result = channel.queue_declare(durable=True, queue=que, auto_delete=False,exclusive=False )
def callback_rabbit(ch,method,properties,body):
print("Message received = ", body)
channel.queue_bind(exchange=exc, queue=que, routing_key=que)
channel.basic_consume(on_message_callback=callback_rabbit,queue=que, auto_ack=True)
channel.start_consuming()
def Start():
credentials = pika.PlainCredentials('guest','guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1','5672','/',credentials))
items = {"exc":[{"exchangeName":"bam", "queueName":"bam_queue1"},{"exchangeName":"bam", "queueName":"bam_queue2"},{"exchangeName":"bam", "queueName":"bam_queue3"}]}
for item in items["exc"]:
channel = connection.channel()
t_msg = Thread(target=Process, args=(channel,item["exchangeName"],item["queueName"]))
t_msg.start()
解决方案
Pika 不是线程安全的。您应该在您的Process
方法中创建一个新的连接和通道。
注意: RabbitMQ 团队会监控rabbitmq-users
邮件列表,有时只会在 StackOverflow 上回答问题。
推荐阅读
- django - Stripe - 为客户创建新卡
- python - 我是否应该记录一个函数的所有参数、异常和返回值,即使它们已经记录在其他函数中?(Python)
- python - (572.15 - 595.00),(3,692.05 - 3,757.00) 如果我想从字符串转换为浮点数,为什么会这样?
- c# - .Net Core 3.1 MVC 具有身份功能,通过 IIS 运行时产生 404 错误
- javascript - 每次我打开模式时都会多次触发 Ajax 调用
- python - 我必须仅使用 for 循环从列表中查找丢失的数字,一旦我认为您会理解,请查看代码
- python - 在熊猫中随条件变化
- python - Python 3 中的浮点减法是不准确的
- angular - 在模板中显式设置时指令输入属性“未定义”
- tensorflow - 将 TensorFlow 转换为 PyTorch