首页 > 解决方案 > 如何使用python解决Rabbitmq中的消费者错误

问题描述

当我使用python在rabbitmq中为headersExchange运行Consumer.py时,出现如下错误我在下面提到了消费者和发布程序

Traceback (most recent call last):
  File "headersConsumer.py", line 32, in <module>
    main()
  File "headersConsumer.py", line 14, in main
    channel.exchange_declare(exchange = 'headers_logs',exchange_type='headers',durable=True)
  File "C:\Python38\lib\site-packages\pika\adapters\blocking_connection.py", line 2387, in 
exchange_declare
    self._flush_output(declare_ok_result.is_ready)
  File "C:\Python38\lib\site-packages\pika\adapters\blocking_connection.py", line 1339, in 
_flush_output
    raise self._closing_reason  # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED 
- inequivalent arg 'type' for exchange 'headers_logs' in vhost '/': received 'headers' but 
current is 'fanout'")

我写过这样的消费者代码

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()    

    channel.exchange_declare(exchange = 'headers_logs',exchange_type='headers',durable=True)

    channel.queue_declare(queue = "HeaderQueue1", durable=True)

    channel.queue_bind(exchange = 'headers_logs', queue="HeadersQueue1", routing_key='',  
arguments={'x-match': 'any', 'key1': 'one', 'key2': 'two'})

    def callback(ch, method, properties, body):
        print(" [x] %r" % body.decode())

    print(' [*] Waiting for logs. To exit press CTRL+C')
    channel.basic_consume(
        queue="HeadersQueue1", on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

我写过这样的发布程序

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='headers_logs',exchange_type='headers')

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish((exchange='headers_logs',routing_key="",body=message,properties=pika.BasicProperties(
        delivery_mode = 2, # make message persistent
        headers = {'key1':'one', 'key2': 'three'}
    ))

print(" [x] Sent %r" % message)
connection.close()

我不明白这个错误,谁能建议这个错误

标签: rabbitmq

解决方案


PRECONDITION_FAILED意味着您使用一组参数声明了一个交换,然后您尝试使用不同的参数创建相同的队列名称。

在你的情况下:

headers_logs' in vhost '/': received 'headers' but 
current is 'fanout'")

所以您正在尝试将交换类型从扇出更改为标头

在此处了解更多详细信息(这是针对队列的,但交换的工作方式相同)。

在使用队列之前,必须先声明它。如果队列不存在,则声明队列将导致它被创建。如果队列已经存在并且其属性与声明中的相同,则声明将无效。当现有队列属性与声明中的不同时,将引发代码为 406 (PRECONDITION_FAILED) 的通道级异常。


推荐阅读