rabbitmq - spring.rabbitmq.listener.simple.retry.enabled=true 如果我手动配置 DirectMessageListenerContainer 将被忽略
问题描述
我正在尝试使用属性激活rabbitmq上的死信队列
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=10
当我使用注释时它工作正常
public class SimpleConsumer {
@RabbitListener(queues = "messages.queue")
public void handleMessage(String message){
throw new RuntimeException();
}
}
但是如果我手动配置 MessageListenerContainer,它就不起作用。
在我的配置下面:
@Bean
SimpleMessageListenerContainer directMessageListenerContainer(
ConnectionFactory connectionFactory,
Queue simpleQueue,
MessageConverter jsonMessageConverter,
SimpleConsumer simpleConsumer)
{
return new SimpleMessageListenerContainer(connectionFactory){{
setQueues(simpleQueue);
setMessageListener(new MessageListenerAdapter(simpleConsumer, jsonMessageConverter));
// setDefaultRequeueRejected(false);
}};
}
如果我将 setDefaultRequeueRejected 设置为 true,它会尝试解决消费者无限时间(如果抛出异常)。
如果我将 setDefaultRequeueRejected 设置为 false,它会尝试解析消费者一次,然后使用 deadLetterConsumer。
@RabbitListener(queues = "messages.queue") 在使用 spring.rabbitmq.listener 配置时做了什么?
在我在 github 上的代码下方
https://github.com/crakdelpol/dead-letter-spike.git
请参阅分支“按配置重试”
解决方案
它将重试拦截器添加到容器的建议链中。请参阅文档。
Spring Retry 提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。Spring AMQP 还提供了一些方便的工厂 bean,用于以方便的形式为 AMQP 用例创建 Spring Retry 拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。有关更多详细信息,请参阅
StatefulRetryOperationsInterceptor
和的 Javadoc 和属性。StatelessRetryOperationsInterceptor
...
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
然后将拦截器添加到容器中adviceChain
。
编辑
请参阅我指向您的文档;您需要将恢复器添加到拦截器:
当所有重试都用尽时调用 MessageRecover。RejectAndDontRequeueRecoverer 正是这样做的。默认 MessageRecoverer 使用错误消息并发出 WARN 消息。
这是一个完整的例子:
@SpringBootApplication
public class So67433138Application {
public static void main(String[] args) {
SpringApplication.run(So67433138Application.class, args);
}
@Bean
Queue queue() {
return QueueBuilder.durable("so67433138")
.deadLetterExchange("")
.deadLetterRoutingKey("so67433138.dlq")
.build();
}
@Bean
Queue dlq() {
return new Queue("so67433138.dlq");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory cf) {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer(cf);
smlc.setQueueNames("so67433138");
smlc.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1_000, 2.0, 10_000)
.recoverer(new RejectAndDontRequeueRecoverer())
.build());
smlc.setMessageListener(msg -> {
System.out.println(new String(msg.getBody()));
throw new RuntimeException("test");
});
return smlc;
}
@RabbitListener(queues = "so67433138.dlq")
void dlq(String in) {
System.out.println("From DLQ: " + in);
}
}
test
test
test
test
test
2021-05-12 11:19:42.034 WARN 70667 ---[ container-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message ...
...
From DLQ: test
推荐阅读
- css - Flexboxgrid 和粘性侧边栏
- vue.js - 将 v-slot 值传递给同一元素
- c++ - 字符串向量在排序之前可以正常工作,但之后它不能做任何事情
- reactjs - 这段代码(来自 React 文档)在什么情况下可能会失败,或者实际上会触发最终的回退条件?
- performance - 将文件上传到部署为 Azure Web 应用程序的 Blazor 服务器应用程序的速度极慢
- java - 如何使用回收适配器的意图在 Web 浏览器中打开 URL
- python - 循环遍历另一个字典内的字典内列表的值
- python - Plotly Dash - 在条形图上添加水平线
- javascript - 如何在字典列表中获取带有键的元素
- azure - Okta 作为 Azure 的 IDP