首页 > 解决方案 > spring.rabbitmq.listener.simple.retry.enabled=true 如果我手动配置 DirectMessageListenerContainer 将被忽略

问题描述

我正在尝试使用属性激活rabbitmq上的死信队列

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=10

当我使用注释时它工作正常

public class SimpleConsumer {

    @RabbitListener(queues = "messages.queue")
    public void handleMessage(String message){
        throw new RuntimeException(); 
    }
}

但是如果我手动配置 MessageListenerContainer,它就不起作用。

在我的配置下面:

@Bean
SimpleMessageListenerContainer directMessageListenerContainer(
        ConnectionFactory connectionFactory,
        Queue simpleQueue,
        MessageConverter jsonMessageConverter,
        SimpleConsumer simpleConsumer)
{

    return new SimpleMessageListenerContainer(connectionFactory){{
        setQueues(simpleQueue);
        setMessageListener(new MessageListenerAdapter(simpleConsumer, jsonMessageConverter));
       // setDefaultRequeueRejected(false);
    }};

}

如果我将 setDefaultRequeueRejected 设置为 true,它会尝试解决消费者无限时间(如果抛出异常)。

如果我将 setDefaultRequeueRejected 设置为 false,它会尝试解析消费者一次,然后使用 deadLetterConsumer。

@RabbitListener(queues = "messages.queue") 在使用 spring.rabbitmq.listener 配置时做了什么?

在我在 github 上的代码下方

https://github.com/crakdelpol/dead-letter-spike.git

请参阅分支“按配置重试”

标签: rabbitmqspring-amqpspring-annotationsspring-rabbit

解决方案


它将重试拦截器添加到容器的建议链中。请参阅文档

Spring Retry 提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。Spring AMQP 还提供了一些方便的工厂 bean,用于以方便的形式为 AMQP 用例创建 Spring Retry 拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。有关更多详细信息,请参阅StatefulRetryOperationsInterceptor和的 Javadoc 和属性。StatelessRetryOperationsInterceptor

...

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();
}

然后将拦截器添加到容器中adviceChain

编辑

请参阅我指向您的文档;您需要将恢复器添加到拦截器:

当所有重试都用尽时调用 MessageRecover。RejectAndDontRequeueRecoverer 正是这样做的。默认 MessageRecoverer 使用错误消息并发出 WARN 消息。

这是一个完整的例子:

@SpringBootApplication
public class So67433138Application {

    public static void main(String[] args) {
        SpringApplication.run(So67433138Application.class, args);
    }

    @Bean
    Queue queue() {
        return QueueBuilder.durable("so67433138")
                .deadLetterExchange("")
                .deadLetterRoutingKey("so67433138.dlq")
                .build();
    }

    @Bean
    Queue dlq() {
        return new Queue("so67433138.dlq");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory cf) {
        SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer(cf);
        smlc.setQueueNames("so67433138");
        smlc.setAdviceChain(RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .backOffOptions(1_000, 2.0, 10_000)
                .recoverer(new RejectAndDontRequeueRecoverer())
                .build());
        smlc.setMessageListener(msg -> {
            System.out.println(new String(msg.getBody()));
            throw new RuntimeException("test");
        });
        return smlc;
    }

    @RabbitListener(queues = "so67433138.dlq")
    void dlq(String in) {
        System.out.println("From DLQ: " + in);
    }

}
test
test
test
test
test
2021-05-12 11:19:42.034 WARN 70667 ---[    container-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message ...
...
From DLQ: test

推荐阅读