database - Spring boot:Exchange 发生错误时的 RabbitMq 和数据库管理
问题描述
如果找不到交换,我在管理 RabbitMQ 和数据库事务时遇到问题。这是简单的顺序:
- 将消息发送到 Exchange
- 在数据库中将已发送消息标记为已发送
当找不到 Exchange 时,不会发送消息,但是会在数据库中更新行,这不尊重事务行为。针对其他错误情况(DB 错误或 RabbitMQ 不可用)正确管理事务。
如何将此用例作为事务处理进行管理?
配置中启用的事务:
@Bean
@ConditionalOnMissingClass("org.springframework.orm.jpa.JpaTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jacksonMessageConverter());
template.setChannelTransacted(true);
return template;
}
我的服务:
@Override
@Transactional
public void push(Message message) {
rabbitTemplate.convertAndSend(
"MessageExchange",
"binding.key",
objectMapper.writeValueAsString(message));
repository.markAsSent(message.getId());
}
离开方法后引发的错误,不在rabbitTemplate.convertAndSend
方法中:
[AMQP Connection] ERROR o.s.a.r.c.CachingConnectionFactory.log : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'MessageExchange' in vhost '/', class-id=60, method-id=40)
[ThreadPoolTaskScheduler1] ERROR o.s.t.s.TransactionSynchronizationUtils.invokeAfterCompletion : TransactionSynchronization.afterCompletion threw exception
java.lang.IllegalStateException: Channel closed during transaction
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1171)
at com.sun.proxy.$Proxy143.txCommit(Unknown Source)
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:153)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:332)
解决方案
推荐阅读
- python - Python 子进程:提供标准输入,读取标准输出,然后提供更多标准输入
- sql - 如何将行转换为分组数据的列?
- php - Google Content API for Shopping:创建货件时出错
- python-3.x - 为什么“IF”之后的其他子句不起作用,当嵌套在while循环完成时
- javascript - 如何使用 vscode 获取用户输入(无 html)
- r - R gganimate 迭代通过颜色、大小、alpha 强调的组
- c# - Unity如何让我的枪在玩家移动时停止射击?
- python - 建立一个空变量以备将来使用?
- java - 可以手动将 JSON 或 JSONB postgresql 数据类型与 spring r2dbc 一起使用吗?
- php - 使用 php/wordpress 中的 date_i18n 函数的日期和时间之间的空间