首页 > 解决方案 > Spring Cloud Stream 与 RabbitMQ binder 和 Transactional consumer/producer 与 DB 操作

问题描述

我有一个Spring Cloud Stream应用程序,它使用Rabbit Binder从RabbitMQ接收消息,更新我的数据库并发送一条或多条消息。我的应用程序可以概括为这个演示应用程序

问题是它似乎不起作用@Transactional(或者至少这是我的印象),因为如果出现异常,数据库将回滚,但即使消费者/生产者默认配置为已处理,也会发送消息。

鉴于我想要实现的是当发生异常时,我希望消费的消息在重试后转到 DLQ,数据库回滚并且不发送消息。

我怎样才能做到这一点?

这是我发送消息my-input交换时演示应用程序的输出

2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError, failedMessage=GenericMessage [payload=byte[4], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=my-input, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=my-input.my-group, amqp_redelivered=false, id=006f733f-5eab-9119-347a-625570383c47, amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=my-input, receivedRoutingKey=#, deliveryTag=2, consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, consumerQueue=my-input.my-group]), contentType=application/json, timestamp=1611063077789}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
    at com.example.demo.MyListener.process(DemoApplication.kt:46)
    at com.example.demo.MyListener$$FastClassBySpringCGLIB$$4381219a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.demo.MyListener$$EnhancerBySpringCGLIB$$f4ed3689.process(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 29 more

message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto

标签: spring-bootrabbitmqspring-cloud-streamspring-transactions

解决方案


由于您将失败的消息发布到 DLQ,从 Rabbit 的角度来看,事务是成功的,原始消息被确认并从队列中删除,并且 Rabbit 事务被提交。

你不能做你想做的事republishToDlq

如果您使用正常的 DLQ 机制(republishToDlq=false代理将原始消息发送到 DLQ)而不是使用额外的元数据重新发布,它将起作用。

如果您想使用元数据重新发布,您可以使用非事务性手动发布到 DLQ RabbitTemplate(因此 DLQ 发布不会与其他发布一起回滚)。

编辑

这是一个如何做你需要的例子。

需要注意的几点:

  1. 我们必须添加一个错误处理程序来重新抛出异常。
  2. 我们必须将重试移动到侦听器容器而不是活页夹;否则,重试将在事务中发生,如果重试成功,则将多条消息存放在输出队列中。
  3. 要使有状态重试起作用,我们必须能够唯一标识每条消息;最简单的解决方案是让发件人设置一个唯一的message_id属性(例如 UUID)。
@SpringBootApplication
@EnableBinding(Processor.class)
public class So65792643Application {

    public static void main(String[] args) {
        SpringApplication.run(So65792643Application.class, args);
    }

    @Autowired
    Processor processor;

    @StreamListener(Processor.INPUT)
    public void in(Message<String> in) {
        System.out.println(in.getPayload());
        processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
        int attempt = RetrySynchronizationManager.getContext().getRetryCount();
        if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
            System.out.println("success");
        }
        else {
            throw new RuntimeException();
        }
    }

    @Bean
    RepublishMessageRecoverer repub(RabbitTemplate template) {
        RepublishMessageRecoverer repub =
                new RepublishMessageRecoverer(template, "DLX", "rk");
        return repub;
    }

    @Bean
    Queue dlq() {
        return new Queue("my-output.dlq");
    }

    @Bean
    DirectExchange dlx() {
        return new DirectExchange("DLX");
    }

    @Bean
    Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
    }

    @ServiceActivator(inputChannel = "my-input.group1.errors")
    void errorHandler(ErrorMessage message) {
        MessagingException mex = (MessagingException) message.getPayload();
        throw mex;
    }

    @RabbitListener(queues = "my-output.dlq")
    void dlqListen(Message<String> in) {
        System.out.println("DLQ:" + in);
    }

    @RabbitListener(queues = "my-output.group2")
    void outListen(String in) {
        if (in.equals("OKAFTERRETRY")) {
            System.out.println(in);
        }
        else {
            System.out.println("Should not see this:" + in);
        }
    }

    /*
     * We must move retries from the binder to stateful retries in the container so that
     * each retry is rolled back, to avoid multiple publishes to output.
     * See max-attempts: 1 in the yaml.
     * In order for stateful retry to work, inbound messages must have a unique message_id
     * property.
     */
    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
        return (container, destinationName, group) -> {
            if ("group1".equals(group)) {
                container.setAdviceChain(RetryInterceptorBuilder.stateful()
                        .backOffOptions(1000, 2.0, 10000)
                        .maxAttempts(2)
                        .recoverer(recoverer(repub))
                        .keyGenerator(args -> {
                            // or generate a unique key some other way
                            return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
                                    .getMessageId();
                        })
                        .build());
            }
        };
    }

    private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
        return (args, cause) -> {
            repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(), cause);
            throw new AmqpRejectAndDontRequeueException(cause);
        };
    }

}
spring:
  cloud:
    stream:
      rabbit:
        default:
          producer:
            transacted: true
          consumer:
            transacted: true
            requeue-rejected: true
      bindings:
        input:
          destination: my-input
          group: group1
          consumer:
            max-attempts: 1
        output:
          destination: my-output
          producer:
            required-groups: group2
okAfterRetry
2021-01-20 12:45:24.385  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
okAfterRetry
success
OKAFTERRETRY

notOkAfterRetry
2021-01-20 12:45:39.336  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
notOkAfterRetry
2021-01-20 12:45:39.339  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
DLQ:GenericMessage [payload=notOkAfterRetry, ..., x-exception-message...

推荐阅读