首页 > 技术文章 > 07RabbitMQ_发布订阅者模式

clcloveHuahua 2018-01-08 16:13 原文

import pika

'''
    1.发布者不需要创建队列去存储要发布的信息.
    2.发布者将消息发送到RabbitMQ的转换器exchange中,并定义了exchange处理message的方式.
    3.发布订阅模式,如果在发布者发布消息的时候订阅者还未订阅,那么订阅者将接收不到消息
'''

# 1.创建一个socket
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
# 2.获取链接的管道
channel=connection.channel()
# 3.定义一个exchange转换器,将需要发布的消息发送的转换器上,将管道连接到定义的转换器上
# 4.设置类型为fanout: 所有bind到此exchange的queue都可以接收消息
channel.exchange_declare(
    exchange='logs',
    exchange_type='fanout'
)
# 5.创建需要发布的消息
message = '发布者要发布的消息1'
# 6.发布者开始发布消息
channel.basic_publish(
    exchange='logs',
    routing_key='',#这里必须为空
    body=message
)
print("发布者发布了消息:%s" % message)
# 7.关闭socket连接
connection.close()

  

import pika

'''
    1.订阅者必须声明一个queue类绑定到exchange(转换器)上
    2.exchange会遍历已经绑定到他身上的Queue,将消息存入queue中,消费者只会访问Queue
'''

# 1.定义连接RabbitMQ的socket
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 2.获取连接RabbitMQ的管道
channel = connection.channel()
# 3.定义要连接的exchange转换器,并设置类型
channel.exchange_declare(
    exchange='logs',
    exchange_type='fanout'
)

# 4.定义一个队列存储转换器存储的消息
# 不指定queue名字,rabbit会随机分配一个名字,
# exclusive(唯一的)=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=True)

# 5.获取队列的名称
queue_name = result.method.queue
# 6.为exchange转换器绑定队列.
channel.queue_bind(
    exchange='logs',
    queue=queue_name
)
print("订阅者开始等到消息:")


# 7.定义一个函数来处理消息
def callback(ch, method, properties, body):
    print('消息是:%s' % body.decode())
# 8.设置接收消息后的处理方法
channel.basic_consume(
    callback,
    queue=queue_name,
    no_ack=True
)
#9.开始接收消息
channel.start_consuming()

  

推荐阅读