首页 > 技术文章 > springboot 2.X 集成RabbitMQ 详解(三)死信队列 + 延时队列

monco-sxy 2020-04-25 15:35 原文

死信队列:
死信交换器是 RabbitMQ 对 AMQP 规范的一个扩展,往往用在对问题消息的诊断上(主要针对消费者),还有延时队列的功能。
消息变成死信一般是以下三种情况:

  • 消息被拒绝,并且设置 requeue 参数为 false
  • 消息过期(默认情况下 Rabbit 中的消息不过期,但是可以设置队列的过期时间和消息的过期时间以达到消息过期的效果)
  • 队列达到最大长度(一般当设置了最大队列长度或大小并达到最大值时)
    死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是 x-dead-letter-exchange。

死信交换器的原理是这样的,假设有一个交换器和队列,这一套消息机制是可以正常运行的,但是由于特殊愿意,发现消费者接收到消息之后,发现这个消息不符合消费的标准,我们可以将这条消费失败的消息加入到另外一个交换器和队列体系中,进行特殊处理。如下图,dlx_direct_exchange 交换器就是为了接收normal_direct_exchange 交换器拒绝的消息。

配置信息

  //----------------------死信队列 模式  重中之重 ------------------------------------------------

    /**
     * 定义一个死信交换器 和普通的交换器没有差别
     */
    @Bean
    public DirectExchange dlxDirectExchange(){
        return new DirectExchange("dlx_direct_exchange");
    }

    /**
     * 定义一个监听死信交换器的死信队列
     * @return
     */
    @Bean
    public Queue dlxQueue(){
        return new Queue("direct.dlx.queue");
    }

    /**
     * 将死信队列绑定到死信路由器
     */
    @Bean
    public Binding bingDlxQueue() {
        return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).withQueueName();
    }

    /**
     * 定义一个普通的direct 交换器
     */
    @Bean
    public DirectExchange normalDirectExchange(){
        return new DirectExchange("normal_direct_exchange");
    }

    /**
     * 定义一个普通的队列
     */
    @Bean
    public Queue normalQueue(){
        Map<String, Object> args = new HashMap<>(3);
        // 绑定死信交换器
        args.put("x-dead-letter-exchange", "dlx_direct_exchange");
        // 绑定死信路由键
        args.put("x-dead-letter-routing-key", "direct.dlx.queue");
        return new Queue("normal.queue", true, false, false, args);
    }

    /**
     * 将普通的队列和交换器绑定
     */
    @Bean
    public Binding bingNormalQueue() {
        return BindingBuilder.bind(normalQueue()).to(normalDirectExchange()).withQueueName();
    }

消费方

    @RabbitHandler
    @RabbitListener(queues = "direct.dlx.queue")
    public void processDirectDlx(Message message, Channel channel) throws IOException {
        // 手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.info("dlx queue receive message" + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = "normal.queue")
    public void processNormal(Message message, Channel channel) throws IOException {
        // 手动应答
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        log.info("normal queue receive message" + new String(message.getBody()));
    }

生产方

  @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("message")
    public void sendDelayMessage() throws InterruptedException {
        String msg = "send delay message";
        log.info(msg);
        rabbitTemplate.convertAndSend("normal_direct_exchange","normal.queue","send a message");
    }

结果:

从结果可以看出,及时拒绝了消息,程序还是会继续向下进行的,不会因为消息的处理而终止。

延时队列
可以配置 队列的TTL 也可以设置 消息的TTL
此篇博客写的较好 https://www.cnblogs.com/mfrank/p/11260355.html
我这边用队列的处理方式

    /**
     * 定义一个普通的队列
     */
    @Bean
    public Queue normalQueue(){
        Map<String, Object> args = new HashMap<>(3);
        // 绑定死信交换器
        args.put("x-dead-letter-exchange", "dlx_direct_exchange");
        // 绑定死信路由键
        args.put("x-dead-letter-routing-key", "direct.dlx.queue");
        // 设置队列失效时间 4000ms
        args.put("x-message-ttl", 4000);
        return new Queue("normal.queue", true, false, false, args);
    }

然后我将 消费者注释掉了 然后就出现了如下结果:

推荐阅读