首页 > 技术文章 > RabbitMQ使用

liuyong1993 2019-03-08 18:20 原文

官网:https://www.rabbitmq.com/

RabbitMQ is the most widely deployed open source message broker.

RabbitMQ是最广泛部署开源的消息中间件。

 Spring-Boot项目引入依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>        

application.yml配置MQ信息:

#RabbitMQ
spring.rabbitmq:
  host: localhost
  port: 5672
  #username: admin
  #password: 123456

# 开启发送确认
publisher-confirms: true
# 开启发送失败退回
publisher-returns: true
# 开启ACK
listener.direct.acknowledge-mode: manual
listener.simple.acknowledge-mode: manual

配置类RabbitConfig声明队列queue1:

/**
 * RabbitMQ配置类
 */
@Configuration
public class RabbitConfig {
    public static final String QUEUE_FIRST = "queue1"; //队列名
    
    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue queueFirst() {
        return new Queue(QUEUE_FIRST);
    }
}

 Work Queues(工作模式)

生产者把消息直接发送到队列中,多个消费者绑定一个队列进行竞争消费。谁抢到谁执行.实用场景:秒杀业务 抢红包等

 生产者发送消息:

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 测试发送消息到队列
     */
    @Test
    public void sendToQueue(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_FIRST, "你好,这是消息hello"+ i);
        }
    }
}

在另外一个项目里,2个消费者来接受消息:@RabbitListener注解,监听队列

@Component
public class Receiver {
    /**
     * 处理消息
     * @param content
     * @throws IOException 
     */
    @RabbitListener(queues = RabbitConfig.QUEUE_FIRST) //监听队列
    public void processMessage1(String content, Channel channel, Message message) throws IOException {
        try {
            System.out.println("消费者1收到消息:" + content);
            //手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //抛弃此条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } catch (Exception e) {
            e.printStackTrace();
            //重新放入队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
    
    @RabbitListener(queues = RabbitConfig.QUEUE_FIRST)
    public void processMessage2(String content, Channel channel, Message message) throws IOException {
        System.out.println("消费者2收到消息:" + content);
        try {
            //手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //抛弃此条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } catch (Exception e) {
            e.printStackTrace();
            //重新放入队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
        
    }
}

运行结果:多个消费者监听同一个队列,消息会均匀地发送给消费者 

Publish/Subscribe(发布订阅模式)

生产者把消息发布到交换机,交换机将消息发给N个队列,消费者绑定响应队列取消息即可,此功能比较适合将某单一系统的简单业务数据消息广播给所有接口

应用场景:邮件群发,群聊天,广告

 配置类RabbitConfig新增代码:声明队列queue2,fanout交换机,绑定交换机和队列:

    public static final String QUEUE_SECOND = "queue2"; //队列名

    @Bean
    public Queue queueSecond() {
        return new Queue(QUEUE_SECOND);
    }

  /**
     * 声明fanout交换机
     * @return
     */
    @Bean
    public Exchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    
    /**
     * 队列绑定fanout交换机,不需要路由键(路由键会忽略)
     * @param queueFirst
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueFirst, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueFirst).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeMessage1(Queue queueSecond, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueSecond).to(fanoutExchange);
    }    

生产者发送消息:

    /**
     * 测试发送消息到faout交换机
     */
    @Test
    public void sendFaout(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("fanoutExchange","", "发布订阅模式发消息:" + i);
        }
    }

把消费者2改为监听队列queue2,消费者1不用改,还是监听队列queue1

@RabbitListener(queues = RabbitConfig.QUEUE_SECOND)
    public void processMessage2(String content, Channel channel, Message message) throws IOException {
        System.out.println("消费者2收到消息:" + content);
        try {
            //手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            //抛弃此条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } catch (Exception e) {
            e.printStackTrace();
            //重新放入队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
        
    }

运行结果:绑定此交换机的队列都收到了一样的消息

Routing(路由模式)

如果路由键完全匹配的话,消息才会被投放到相应的队列.amq.direct是rabbitMQ默认的持久化的交换机.

由于主题模式包含了路由模式,而且工作中基本用主题模式,就是交换机类型不一样,主题模式的路由规则更灵活。路由模式例子就不写了

Topics(主题模式)

模糊匹配,设置模糊的绑定方式,"*"操作符将"."视为分隔符,匹配单个单词;"#"操作符没有分块的概念,它将任意"."均视为关键字的匹配部分,能够匹配多个字符.

配置类RabbitConfig新增代码:声明topic交换机,绑定交换机和队列,队列queue1路由规则为topic.*,queue2路由规则为topic.#:

  /**
     * 声明topic交换机
     * @return
     */
    @Bean
    public Exchange topicExchange() {
        return new TopicExchange("topicExchange");
    }
    /**
     * 队列绑定topic交换机
     * @param queueFirst 队列Bean
     * @param topicExchange 交换机Bean
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueFirst, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueFirst).to(topicExchange).with("topic.*");
    }
    @Bean
    Binding bindingExchangeMessage1(Queue queueSecond, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueSecond).to(topicExchange).with("topic.#");
    }

和原来一样,消费者1监听队列queue1,消费者2监听queue2

发送消息:

    /**
     * 测试发送消息到topic交换机
     */
    @Test
    public void sendTopic(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("topicExchange","topic.msg.消息", "主题消息:你好" + i);
        }
    }

运行结果:发送消息的路由键为topuc.msg.消息,只能匹配上队列queue2,路由规则topic.#

 

发送消息的路由键改为:topic.msg

   /**
     * 测试发送消息到topic交换机
     */
    @Test
    public void sendTopic(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("topicExchange","topic.msg", "主题消息:你好" + i);
        }
    }

运行结果:发送消息的路由键为topic.msg,两个队列的路由都匹配上了。

 

 

推荐阅读