spring-boot - 在 SpringBoot 的一个事务中组发送 kafka 消息和数据库更新
问题描述
我需要在一个事务中执行多个操作
- 产生卡夫卡消息
- 更新表 A
- 更新表 B
我可以发送消息并且不更新两个表(A 和 B)。我不能生成消息并更新其中一个表。
我正在尝试使用@Transactional
注释来实现我的目标
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());
entityService.approve(entity.getTransactionId());
logService.logApproval(entity);
producer.send(approvalEvent);
}
我做对了吗?
解决方案
上述方法的问题在于,您在一个事务中与两个不同的系统(数据库和消息队列)进行交互。当一个系统上的操作成功而另一个系统上的操作失败时要处理的场景组合使解决方案变得复杂。
微服务世界中有一种模式可以处理完全相同的场景。它被称为发件箱模式。
你可以在这里阅读更多关于它的信息。
简短的摘要是您的数据库中有一个名为 outbox 的附加表,其中包含要发布到消息队列的消息。
在用于添加\更新实体的数据库事务中,您在发件箱表工具中插入一行,其中包含对该实体的操作的详细信息。
然后,您从发件箱表中异步读取行并通过轮询或使用更改数据捕获发布到消息队列。在此处查看使用 debezium的示例实现。
您的交易代码将如下所示。
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());
entityService.approve(entity.getTransactionId());
logService.logApproval(entity);
//Outbox is the table containing the records to be published to MQ
outboxRepo.save(approvalEvent);
}
推荐阅读
- pine-script - 您可以将自定义交易视图松树脚本添加到筛选器吗?
- r - 使用R在plolty中将悬停文本添加到箱线图
- node.js - 使用 2019 年巴西夏令时的节点
- php - xpath php 7 与 php 5.6
- c - 如何将值分配给双指针数组,该数组是 c 中结构的成员,当它传递给其他函数时
- java - Anylogic microsoft sql error The index 1 is out of range
- java - 用于云 kubernetes 的 Spring Cloud Config Server 与 ConfigMaps
- javascript - 创建工作表时在 Sheetjs 中设置日期格式
- ios - 在 UIScrollView 中单击按钮时将 UITextFields 添加到另一个下方
- mysql - 如何将 mysql 查询“按语句分组”结果转换为 json 结构?