首页 > 解决方案 > 当从 amqp 转换消息发生错误时,弹簧整数 amqp 不会重试

问题描述

弹簧整合 amqp 版本:5.0.11

目标:当发生致命异常时,消息将被丢弃。但不是致命的,消息将重新排队并执行重试策略。

但就我而言,我有一个自定义消息转换器,当我的转换发生一些非致命错误时,它将始终重新排队并且永远不会进入重试策略。

我尝试阅读代码,AmqpInboundChannelAdapter.Listener#onMessage在重试之前,它转换消息,这意味着当消息转换发生一些错误时,它不会重试,将进入错误处理程序。

public void onMessage(final Message message, final Channel channel) throws Exception {
        boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
        try {
            if (retryDisabled) {
                createAndSend(message, channel);
            }
            else {
                 final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel); 
                AmqpInboundChannelAdapter.this.retryTemplate.execute(context -> {
                            StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
                            setAttributesIfNecessary(message, toSend);
                            sendMessage(toSend);
                            return null;
                        },
                        (RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
            }
        }

我的代码如下:

  @Bean
public IntegrationFlow EndpointMessageAndConvertModelDlxFlow(
    @Qualifier("rabbitmqUnFatalExceptionRetryTemplate") RetryTemplate template,
    ConnectionFactory factory,
    EndpointCodeDelegatingMessageConverter converter) {
    final MailRabbitmqProperties.Queue queue = getQueueConfig(ENDPOINT_BUFFER_DLX_NODE,
        ENDPOINT_BUFFER_FUNCTION);
    template.setRetryPolicy(endpointMessageExceptionClassifierRetryPolicy()); //fatal err not go retry
    return IntegrationFlows.from(Amqp.inboundAdapter(factory, queue.getName())
        .configureContainer(smlc -> {
            smlc.acknowledgeMode(AcknowledgeMode.AUTO); //
            smlc.defaultRequeueRejected(true); //requeue
            final ConditionalRejectingErrorHandler errorHandler =
                new ConditionalRejectingErrorHandler(
                    new EndPointMessageFatalExceptionStrategy());
            smlc.errorHandler(errorHandler);
        })
        .retryTemplate(template) 
        .messageConverter(converter))
        .channel(mailActionCreateTopicChannel())
        .get();
}

我该如何解决这个问题,谢谢。

标签: springspring-integrationspring-rabbitspring-integration-amqp

解决方案


好吧,我们认为转换器错误确实是致命的。这就是为什么从 AMQP 消息的转换实际上是在重试循环之外完成的。

如果您确定转换器中的错误是间歇性的,请考虑在转换器中包含重试逻辑,因此当AmqpInboundChannelAdapter调用时,如果发生异常this.converter.fromMessage(message),您将被应用。RetryTemplate

看看你是否使用ProxyFactoryBeanRetryInterceptorBuilder开箱即用的转换器之一。否则,@Retryable可以将 a 应用于该convert()方法。

在 Spring Retry 项目中查看更多信息:https ://docs.spring.io/spring-batch/docs/current/reference/html/retry.html


推荐阅读