首页 > 技术文章 > RabbitMQ之路由

gsblog 2014-07-03 23:43 原文

为了实现一个新功能:只订阅消息的一个子集,例如只需要把严重的错误日志信息写入日志文件(存储到磁盘上),但同时仍然把所有的日志信息输出到控制台中。

 

绑定(Bindings)

创建绑定

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

建立exchange和queue之间的关系,简单理解是这个队列对这个交换器的消息感兴趣。

绑定的时候可以带上一个额外的routing_key参数,为了避免与basic_publish参数混淆,叫做binding key.

创建一个带binding key的绑定

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

binding key的含义取决于交换器的类型,对于fanout类型会忽略这个值。

 

Driect类型的交换器exchange

使用fanout类型的交换器exchange是广播类型,对于消息的过滤需要使用direct类型

使用direct类型的交换器,交换器将会对binding key和routing key进行精确匹配,从而确定消息该分发到哪个队列。

如上图,可以看到x交换器和两个队列进行绑定,第一个队列使用orange作为binding key,第二个队列有两个绑定,一个使用black,一个使用green。

这样,当routing key为orange的消息发布到交换器,会路由到队列Q1,black和green两个类型路由Q2,其他的所有消息将会被丢弃。

 

多个绑定(Multiple bindings)

多个队列使用相同的binding key是合法的。

可以添加一个x和Q1之间的绑定,也可以再添加一个x和Q2的绑定。这样以来,指定交换direct类型和fanout广播类型功能相同。

 

Emmiting logs

将会发送消息到一个direct exchange,把日志级别作为routing key。

这样负责处理接收的脚本可以选择要处理的日志级别。

 

创建一个direct类型的交换器

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

然后发送一条消息:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

severity值假定为info,warning,error中的一个

 

订阅(subscribing)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

为每一个日志级别创建一个新的绑定。

 

例子

将error信息发送到一个队列,将所有信息发送另一个队列

 

 emit_log

#!/usr/bin/env python
#-*- coding:utf8 -*-
import sys 
import pika
import logging

logging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)

def emit_log():

    pika.connection.Parameters.DEFAULT_HOST = 'localhost'
    pika.connection.Parameters.DEFAULT_PORT = 5672
    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' 
    pika.connection.Parameters.DEFAULT_USERNAME = 'guosong'
    pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong'

    para = pika.connection.Parameters()

    connection = pika.BlockingConnection(para)

    channel = connection.channel()
    #声明一个direct_logs交换器,类型为direct
    channel.exchange_declare(exchange='direct_logs',type='direct')

    #指定日志级别
    serverity = sys.argv[1] if len(sys.argv) >1 else 'info'

    message = '.'.join(sys.argv[1:]) or "info:Hello World!"

    #发送的时候指定routing_key为空,没有绑定队列到交换器上,消息将会丢失
    #对于日志类消息,如果没有消费者监听的话,这些消息就会忽略
    channel.basic_publish(exchange='logs',routing_key=serverity,body=message)

    #%r也是string类型
    print "[x] Sent %r" % (message,)

    connection.close()

if __name__ == '__main__':                                                                   
    emit_log()

 接收者

#!/usr/bin/env python
#-*- coding:utf8 -*-
import sys
import pika
import logging

logging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)

#回调函数,处理消息                                                                          
def callback(ch, method, properties, body):
    print " [x] %r " % (body,)

def receive_logs():
    pika.connection.Parameters.DEFAULT_HOST = 'localhost'
    pika.connection.Parameters.DEFAULT_PORT = 5672
    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/'
    pika.connection.Parameters.DEFAULT_USERNAME = 'guosong'
    pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong'

    para = pika.connection.Parameters()
    connection = pika.BlockingConnection(para)
    channel = connection.channel()

    #声明一个logs交换器,类型为fanout,不允许发布消息到不存在的交换器
    channel.exchange_declare(exchange='direct_logs',type='direct')

    #声明一个随机队列,设置exclusive=True,在该consumer退出的时候,对应的队列被删除
    result = channel.queue_declare(exclusive=True)
    #获取随机队列的名称
    queue_name = result.method.queue

    serverities = sys.argv[1:]
    if not serverities:
        print >> sys.stderr, "Usage: %s [info] [warning] [error]" %  sys.argv[0]
        sys.exit(1)

    for serverity in serverities:
         #绑定交换器和队列
        channel.queue_bind(exchange='logs',queue=queue_name,routing_key=serverity)
    print '[*] Wating for logs.To exit press CTRL+C'

    #开始消费消息
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()

if __name__ == '__main__':
    receive_logs()

如果只希望保存warning和error级别的日志到磁盘,只需要打开控制台并输入:

guosong@guosong:~/code/rabbitmq/ch4$ ./receive_logs.py warning error >logs

如果需要所有的话,执行如下命令:

guosong@guosong:~/code/rabbitmq/ch4$ ./receive_logs.py info warning error >logs

  

 

 

 

推荐阅读