java - Spring事务同步不起作用(TransactionalEventListener)
问题描述
我知道这个问题在这个网站上以稍微不同的格式被问到,但是按照这些帖子上给出的建议让我无处可去。我已经花了将近两天的时间在这上面,我没有想法。
我们有一个 Spring Boot 微服务,它只是监听进入 IBM MQ 队列的消息,进行一些转换并将其转发到 Kafka 主题。我们希望这是事务性的,因此不会丢失任何消息(对我们的业务至关重要)。我们还希望能够对事务提交和回滚事件做出反应,以达到监控和支持的目的。
我只是关注了互联网上的一些“操作方法”位置,我可以使用@Transactional
如下注释以声明性方式轻松实现事务行为:
@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// Some work here including forward to Kafka topic:
// ...
// ...
// Then publish an event which is supposed to be acted on:
applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));
// Uncommented exception below to create a rollback scenario
// or comment it out to have the processing completed
throw new RuntimeException("No good Pal!");
}
正如预期的那样,在播放带有异常的消息时,由于事务管理器一次又一次地回滚,处理将永远旋转。这对我们有好处。
现在我们期望在我们的 listener 方法中发布的 MqConsumedEvent 被onRollback
以下方法拦截:
@Component
@Slf4j
public class MqConsumedEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
public void onCommit(MqConsumedEvent event) {
log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
public void onRollback(MqConsumedEvent event) {
log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
}
}
这没有发生。类似地注释掉监听器中抛出的异常使得我们的 MQ 消息被传递给 Kafka。但是该onCommit
方法没有被执行。
通过进一步的研究和弹簧调试,我相信这没有执行,因为弹簧认为在发布事件时没有活动事务,而我的事件只是被忽略了。在日志中评估TransactionSynchronizationManager.isActualTransactionActive()
和打印它表明false
这很难解释,因为正如我所说,当故意抛出异常时,事务会按预期回滚。
预先感谢您的意见。
更新:
我设置的断点让我执行了这个ApplicationListenerMethodTransactionalAdapter
类:
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
出于某种原因,我不理解第一个 if 条件为假。然后回退执行是false
因为我没有true
在我的@TransactionalEventListener
使用中设置它,它将最终在 else 分支上并跳过事件。
解决方案
推荐阅读
- node.js - Heroku postgresql 查询不在 Heroku 上工作,但在本地工作
- javascript - Leaflet API Overlaying 下拉菜单
- c# - 为什么 Assembly.GetTypes() 会触发 AppDomain.CurrentDomain.AssemblyResolve 事件?
- javascript - 使用未知密钥在 Javascript 中遍历 JSON 对象
- redux - ReduxToolkit 如何将自定义错误从 API 传递到 CreateAsync thunk 和 CreateReducer
- javascript - 木偶删除元素内的节点
- python - ValueError:x、y 和格式字符串不得为 None (matplotlib.pyplot)
- excel - 将下一个搜索条件复制到 Sheet1 B2
- homebrew - 在 M1 MacBook 上切换到原生 Homebrew 的建议方法是什么?
- css - CSS 代码中的 @from-width 和 @from-table 到底是什么