首页 > 解决方案 > Spring事务同步不起作用(TransactionalEventListener)

问题描述

我知道这个问题在这个网站上以稍微不同的格式被问到,但是按照这些帖子上给出的建议让我无处可去。我已经花了将近两天的时间在这上面,我没有想法。

我们有一个 Spring Boot 微服务,它只是监听进入 IBM MQ 队列的消息,进行一些转换并将其转发到 Kafka 主题。我们希望这是事务性的,因此不会丢失任何消息(对我们的业务至关重要)。我们还希望能够对事务提交和回滚事件做出反应,以达到监控和支持的目的。

我只是关注了互联网上的一些“操作方法”位置,我可以使用@Transactional如下注释以声明性方式轻松实现事务行为:

@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Some work here including forward to Kafka topic:
    // ...
    // ...

    // Then publish an event which is supposed to be acted on:
    applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));

    // Uncommented exception below to create a rollback scenario
    // or comment it out to have the processing completed
    throw new RuntimeException("No good Pal!");
}

正如预期的那样,在播放带有异常的消息时,由于事务管理器一次又一次地回滚,处理将永远旋转。这对我们有好处。

现在我们期望在我们的 listener 方法中发布的 MqConsumedEvent 被onRollback以下方法拦截:

@Component
@Slf4j
public class MqConsumedEventListener {
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
    public void onCommit(MqConsumedEvent event) {
        log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
    public void onRollback(MqConsumedEvent event) {
        log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
    }
}

这没有发生。类似地注释掉监听器中抛出的异常使得我们的 MQ 消息被传递给 Kafka。但是该onCommit方法没有被执行。

通过进一步的研究和弹簧调试,我相信这没有执行,因为弹簧认为在发布事件时没有活动事务,而我的事件只是被忽略了。在日志中评估TransactionSynchronizationManager.isActualTransactionActive()和打印它表明false这很难解释,因为正如我所说,当故意抛出异常时,事务会按预期回滚。

预先感谢您的意见。

更新:

我设置的断点让我执行了这个ApplicationListenerMethodTransactionalAdapter类:

@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (TransactionSynchronizationManager.isSynchronizationActive() &&
            TransactionSynchronizationManager.isActualTransactionActive()) {
        TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
        TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
    }
    else if (this.annotation.fallbackExecution()) {
        if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
            logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
        }
        processEvent(event);
    }
    else {
        // No transactional event execution at all
        if (logger.isDebugEnabled()) {
            logger.debug("No transaction is active - skipping " + event);
        }
    }
}

出于某种原因,我不理解第一个 if 条件为假。然后回退执行是false因为我没有true在我的@TransactionalEventListener使用中设置它,它将最终在 else 分支上并跳过事件。

标签: javaspringspring-bootspring-transactions

解决方案


推荐阅读