spring-boot - Spring Cloud Stream 与 RabbitMQ binder 和 Transactional consumer/producer 与 DB 操作
问题描述
我有一个Spring Cloud Stream应用程序,它使用Rabbit Binder从RabbitMQ接收消息,更新我的数据库并发送一条或多条消息。我的应用程序可以概括为这个演示应用程序:
问题是它似乎不起作用@Transactional
(或者至少这是我的印象),因为如果出现异常,数据库将回滚,但即使消费者/生产者默认配置为已处理,也会发送消息。
鉴于我想要实现的是当发生异常时,我希望消费的消息在重试后转到 DLQ,数据库回滚并且不发送消息。
我怎样才能做到这一点?
这是我发送消息my-input
交换时演示应用程序的输出
2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError, failedMessage=GenericMessage [payload=byte[4], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=my-input, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=my-input.my-group, amqp_redelivered=false, id=006f733f-5eab-9119-347a-625570383c47, amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=my-input, receivedRoutingKey=#, deliveryTag=2, consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, consumerQueue=my-input.my-group]), contentType=application/json, timestamp=1611063077789}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
at com.example.demo.MyListener.process(DemoApplication.kt:46)
at com.example.demo.MyListener$$FastClassBySpringCGLIB$$4381219a.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.example.demo.MyListener$$EnhancerBySpringCGLIB$$f4ed3689.process(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 29 more
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
解决方案
由于您将失败的消息发布到 DLQ,从 Rabbit 的角度来看,事务是成功的,原始消息被确认并从队列中删除,并且 Rabbit 事务被提交。
你不能做你想做的事republishToDlq
。
如果您使用正常的 DLQ 机制(republishToDlq=false
代理将原始消息发送到 DLQ)而不是使用额外的元数据重新发布,它将起作用。
如果您想使用元数据重新发布,您可以使用非事务性手动发布到 DLQ RabbitTemplate
(因此 DLQ 发布不会与其他发布一起回滚)。
编辑
这是一个如何做你需要的例子。
需要注意的几点:
- 我们必须添加一个错误处理程序来重新抛出异常。
- 我们必须将重试移动到侦听器容器而不是活页夹;否则,重试将在事务中发生,如果重试成功,则将多条消息存放在输出队列中。
- 要使有状态重试起作用,我们必须能够唯一标识每条消息;最简单的解决方案是让发件人设置一个唯一的
message_id
属性(例如 UUID)。
@SpringBootApplication
@EnableBinding(Processor.class)
public class So65792643Application {
public static void main(String[] args) {
SpringApplication.run(So65792643Application.class, args);
}
@Autowired
Processor processor;
@StreamListener(Processor.INPUT)
public void in(Message<String> in) {
System.out.println(in.getPayload());
processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
int attempt = RetrySynchronizationManager.getContext().getRetryCount();
if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
System.out.println("success");
}
else {
throw new RuntimeException();
}
}
@Bean
RepublishMessageRecoverer repub(RabbitTemplate template) {
RepublishMessageRecoverer repub =
new RepublishMessageRecoverer(template, "DLX", "rk");
return repub;
}
@Bean
Queue dlq() {
return new Queue("my-output.dlq");
}
@Bean
DirectExchange dlx() {
return new DirectExchange("DLX");
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
}
@ServiceActivator(inputChannel = "my-input.group1.errors")
void errorHandler(ErrorMessage message) {
MessagingException mex = (MessagingException) message.getPayload();
throw mex;
}
@RabbitListener(queues = "my-output.dlq")
void dlqListen(Message<String> in) {
System.out.println("DLQ:" + in);
}
@RabbitListener(queues = "my-output.group2")
void outListen(String in) {
if (in.equals("OKAFTERRETRY")) {
System.out.println(in);
}
else {
System.out.println("Should not see this:" + in);
}
}
/*
* We must move retries from the binder to stateful retries in the container so that
* each retry is rolled back, to avoid multiple publishes to output.
* See max-attempts: 1 in the yaml.
* In order for stateful retry to work, inbound messages must have a unique message_id
* property.
*/
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
return (container, destinationName, group) -> {
if ("group1".equals(group)) {
container.setAdviceChain(RetryInterceptorBuilder.stateful()
.backOffOptions(1000, 2.0, 10000)
.maxAttempts(2)
.recoverer(recoverer(repub))
.keyGenerator(args -> {
// or generate a unique key some other way
return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
.getMessageId();
})
.build());
}
};
}
private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
return (args, cause) -> {
repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(), cause);
throw new AmqpRejectAndDontRequeueException(cause);
};
}
}
spring:
cloud:
stream:
rabbit:
default:
producer:
transacted: true
consumer:
transacted: true
requeue-rejected: true
bindings:
input:
destination: my-input
group: group1
consumer:
max-attempts: 1
output:
destination: my-output
producer:
required-groups: group2
okAfterRetry
2021-01-20 12:45:24.385 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
okAfterRetry
success
OKAFTERRETRY
notOkAfterRetry
2021-01-20 12:45:39.336 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
notOkAfterRetry
2021-01-20 12:45:39.339 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
DLQ:GenericMessage [payload=notOkAfterRetry, ..., x-exception-message...
推荐阅读
- xgboost - 将训练数据拟合到 xgboost 时出现分段错误
- python - 如何更新 Piplock 文件?
- c++ - 用零初始化 std::chrono::time_point 变量
- scala - 对象的方法如何引用该对象的内部类?
- javascript - 在移动设备上关闭和打开 Chrome 浏览器后 Swiper 幻灯片调整大小
- node.js - Nodejs:在查询执行期间等待API响应
- html - 我如何应用这个 CSS 类
- java - 为什么浏览器实例在 Selenium 和 TestNG 中执行后没有关闭/退出?
- python - 是否有任何算法可以检测图中最远的两个节点?对不起,如果这个问题可能是微不足道的
- python-2.7 - 如何使用 Microsoft 图形 API 在 Onedrive 中公开文件