import pika ''' 1.发布者不需要创建队列去存储要发布的信息. 2.发布者将消息发送到RabbitMQ的转换器exchange中,并定义了exchange处理message的方式. 3.发布订阅模式,如果在发布者发布消息的时候订阅者还未订阅,那么订阅者将接收不到消息 ''' # 1.创建一个socket connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) # 2.获取链接的管道 channel=connection.channel() # 3.定义一个exchange转换器,将需要发布的消息发送的转换器上,将管道连接到定义的转换器上 # 4.设置类型为fanout: 所有bind到此exchange的queue都可以接收消息 channel.exchange_declare( exchange='logs', exchange_type='fanout' ) # 5.创建需要发布的消息 message = '发布者要发布的消息1' # 6.发布者开始发布消息 channel.basic_publish( exchange='logs', routing_key='',#这里必须为空 body=message ) print("发布者发布了消息:%s" % message) # 7.关闭socket连接 connection.close()
import pika ''' 1.订阅者必须声明一个queue类绑定到exchange(转换器)上 2.exchange会遍历已经绑定到他身上的Queue,将消息存入queue中,消费者只会访问Queue ''' # 1.定义连接RabbitMQ的socket connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 2.获取连接RabbitMQ的管道 channel = connection.channel() # 3.定义要连接的exchange转换器,并设置类型 channel.exchange_declare( exchange='logs', exchange_type='fanout' ) # 4.定义一个队列存储转换器存储的消息 # 不指定queue名字,rabbit会随机分配一个名字, # exclusive(唯一的)=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(exclusive=True) # 5.获取队列的名称 queue_name = result.method.queue # 6.为exchange转换器绑定队列. channel.queue_bind( exchange='logs', queue=queue_name ) print("订阅者开始等到消息:") # 7.定义一个函数来处理消息 def callback(ch, method, properties, body): print('消息是:%s' % body.decode()) # 8.设置接收消息后的处理方法 channel.basic_consume( callback, queue=queue_name, no_ack=True ) #9.开始接收消息 channel.start_consuming()