首页 > 解决方案 > 在 SpringBoot 的一个事务中组发送 kafka 消息和数据库更新

问题描述

我需要在一个事务中执行多个操作

我可以发送消息并且不更新两个表(A 和 B)。我不能生成消息并更新其中一个表。

我正在尝试使用@Transactional注释来实现我的目标


import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

 @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
    public void handle(Event approvalEvent) {
        var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());

        entityService.approve(entity.getTransactionId());
        logService.logApproval(entity);
        producer.send(approvalEvent);
    }

我做对了吗?

标签: spring-bootapache-kafkaspring-data-jpaspring-transactions

解决方案


上述方法的问题在于,您在一个事务中与两个不同的系统(数据库和消息队列)进行交互。当一个系统上的操作成功而另一个系统上的操作失败时要处理的场景组合使解决方案变得复杂。

微服务世界中有一种模式可以处理完全相同的场景。它被称为发件箱模式。

你可以在这里阅读更多关于它的信息。

简短的摘要是您的数据库中有一个名为 outbox 的附加表,其中包含要发布到消息队列的消息。

在用于添加\更新实体的数据库事务中,您在发件箱表工具中插入一行,其中包含对该实体的操作的详细信息。

然后,您从发件箱表中异步读取行并通过轮询或使用更改数据捕获发布到消息队列。在此处查看使用 debezium的示例实现。

您的交易代码将如下所示。

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
    var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());

    entityService.approve(entity.getTransactionId());
    logService.logApproval(entity);
    //Outbox is the table containing the records to be published to MQ 
    outboxRepo.save(approvalEvent);
}

推荐阅读