首页 > 技术文章 > kombu操作RabbitMQ带优先级例子

clbao 原文

Kombu是一个使用AMQP协议的Python库

安装Kombu

pip install kombu


Producer
import time
from kombu.entity import Exchange, Queue
from kombu.messaging import Producer
from kombu.connection import Connection

with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
    with connection.channel() as channel:
        for i in range(1, 10):
            science_news = Queue(name='kombu_queue',
                                 exchange=Exchange('kombu_queue', type='direct'),
                                 routing_key='kombu_queue',
                                 channel=channel,
                                 durable=False,
                                 )
            science_news.declare()
            producer = Producer(channel, serializer='json', routing_key='kombu_queue')
            producer.publish({'name': 'kombu_queue', 'size': i})

            science_news = Queue(name='kombu_queue_1',
                                 exchange=Exchange('kombu_queue_1', type='direct'),
                                 routing_key='kombu_queue_1',
                                 channel=channel,
                                 max_priority=10,  # 优先级
                                 durable=False,
                                 )
            science_news.declare()
            producer = Producer(channel, serializer='json', routing_key='kombu_queue_1')
            producer.publish({'name': 'kombu_queue_1', 'size': i}, priority=i)
consume
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer
from kombu.connection import Connection


def test_1(body, message):
    print(body)
    print("my_consume_1")
    message.ack()


def test_2(body, message):
    print(message)
    print("my_consume_2")


def test_3(body, message):
    print("my_consume_3")


with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
    with connection.channel() as channel:
        kombu_queue = Queue(name='kombu_queue',
                            exchange=Exchange('kombu_queue', type='direct'),
                            routing_key='kombu_queue',
                            durable=False,
                            channel=channel,
                            )
        kombu_queue_1 = Queue(name='kombu_queue_1',
                              exchange=Exchange('kombu_queue_1', type='direct'),
                              routing_key='kombu_queue_1',
                              durable=False,
                              channel=channel,
                              max_priority=10,  # 优先级
                              )
        # 消费
        consumer = Consumer(channel,
                            queues=[kombu_queue, kombu_queue_1],  # 多个队列
                            accept=['json', 'pickle', 'msgpack', 'yaml'],  # 多种类型
                            callbacks=[test_1, test_2, test_3]  # 多个回调
                            )
        consumer.consume()

        while True:
            import time

            connection.drain_events()

推荐阅读