spring - 当从 amqp 转换消息发生错误时,弹簧整数 amqp 不会重试
问题描述
弹簧整合 amqp 版本:5.0.11
目标:当发生致命异常时,消息将被丢弃。但不是致命的,消息将重新排队并执行重试策略。
但就我而言,我有一个自定义消息转换器,当我的转换发生一些非致命错误时,它将始终重新排队并且永远不会进入重试策略。
我尝试阅读代码,AmqpInboundChannelAdapter.Listener#onMessage
在重试之前,它转换消息,这意味着当消息转换发生一些错误时,它不会重试,将进入错误处理程序。
public void onMessage(final Message message, final Channel channel) throws Exception {
boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
try {
if (retryDisabled) {
createAndSend(message, channel);
}
else {
final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel);
AmqpInboundChannelAdapter.this.retryTemplate.execute(context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
setAttributesIfNecessary(message, toSend);
sendMessage(toSend);
return null;
},
(RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
}
}
我的代码如下:
@Bean
public IntegrationFlow EndpointMessageAndConvertModelDlxFlow(
@Qualifier("rabbitmqUnFatalExceptionRetryTemplate") RetryTemplate template,
ConnectionFactory factory,
EndpointCodeDelegatingMessageConverter converter) {
final MailRabbitmqProperties.Queue queue = getQueueConfig(ENDPOINT_BUFFER_DLX_NODE,
ENDPOINT_BUFFER_FUNCTION);
template.setRetryPolicy(endpointMessageExceptionClassifierRetryPolicy()); //fatal err not go retry
return IntegrationFlows.from(Amqp.inboundAdapter(factory, queue.getName())
.configureContainer(smlc -> {
smlc.acknowledgeMode(AcknowledgeMode.AUTO); //
smlc.defaultRequeueRejected(true); //requeue
final ConditionalRejectingErrorHandler errorHandler =
new ConditionalRejectingErrorHandler(
new EndPointMessageFatalExceptionStrategy());
smlc.errorHandler(errorHandler);
})
.retryTemplate(template)
.messageConverter(converter))
.channel(mailActionCreateTopicChannel())
.get();
}
我该如何解决这个问题,谢谢。
解决方案
好吧,我们认为转换器错误确实是致命的。这就是为什么从 AMQP 消息的转换实际上是在重试循环之外完成的。
如果您确定转换器中的错误是间歇性的,请考虑在转换器中包含重试逻辑,因此当AmqpInboundChannelAdapter
调用时,如果发生异常this.converter.fromMessage(message)
,您将被应用。RetryTemplate
看看你是否使用ProxyFactoryBean
了RetryInterceptorBuilder
开箱即用的转换器之一。否则,@Retryable
可以将 a 应用于该convert()
方法。
在 Spring Retry 项目中查看更多信息:https ://docs.spring.io/spring-batch/docs/current/reference/html/retry.html
推荐阅读
- django - DJANGO - simple-history - 如何使用标准 django ORM 查找从基本模型中查找历史表
- sql - 预期的参数,多个文件上传 Symfony?
- java - Convert List
to new Object[]{} - firebase - AngularFire2:ERROR 错误:权限缺失或不足
- php - click a link to bring up modal
- python - 将数据附加到熊猫数据框中
- java - Java print to file from multiple methods with appending the text
- encryption - Hash Function to create unique hash for 70 million Strings with return type long
- web-scraping - X-Instagram-GIS 标头生成
- arrays - 如何从多维数组中将空值替换为“”