java - 无法使用ChainedKafkaTransaction 同步 Kafka 和 MQ 事务
问题描述
我们有一个 Spring Boot 应用程序,它使用来自 IBM MQ 的消息进行一些转换并将结果发布到 Kafka 主题。我们为此使用https://spring.io/projects/spring-kafka 。我知道 Kafka 不支持 XA;但是,在文档中,我发现了一些关于使用 aChainedKafkaTransactionManager
链接多个事务管理器并同步事务的输入。相同的文档还提供了一个示例,说明如何在从 Kafka 读取消息并将它们存储在数据库中时同步 Kafka 和数据库。
我在我的 se 案例中遵循相同的示例,并将JmsTransactionManager
with链接到KafkaTransactionManager
a 的保护伞下ChainedKafkaTransactionManager
。bean定义如下:
@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.connectionFactory());
factory.setTransactionManager(this.jmsTransactionManager());
return factory;
}
@Bean
public JmsTransactionManager jmsTransactionManager() {
return new JmsTransactionManager(this.connectionFactory());
}
@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}
@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// Processing the message here then publishing it to Kafka using KafkaTemplate
kafkaTemplate.send(sourceTopic,transformedMessage);
// Then throw an exception just to test the transaction behaviour
throw new RuntimeException("Not good Pal!");
}
运行应用程序时,发生的事情是消息不断回滚到 MQ 队列中,但消息在 Kafka 主题中不断增长,这对我来说意味着 kafkaTemplate 交互不会回滚。
如果我根据文档很好地理解了这不应该是这种情况。“如果事务处于活动状态,则在事务范围内执行的任何 KafkaTemplate 操作都使用事务的 Producer。”
在我们的 application.yaml 中,我们通过设置将 Kafka 生产者配置为使用事务spring.kafka.producer.transaction-id-prefix
问题是我在这里缺少什么以及我应该如何解决它。预先感谢您的意见。
解决方案
消费者默认可以看到未提交的记录;将isolation.level
消费者属性设置read_committed
为避免接收来自回滚事务的记录。
推荐阅读
- python - Python 中未知的字符串格式 pd.to_datetime。尝试将此格式转换为日期时间格式时遇到问题
- visual-studio-code - 有没有办法从浏览器中的 VSCode 终端/调试控制台链接打开 .html?
- keycloak - keycloak auth/admin/master/console/whoami 401(未经授权)
- python - 查找文本中每个句子的长度
- reactjs - 在 React 上开玩笑,测试一个叫作 prop
- node.js - 在 MongoDB 中,如何根据对象的值对对象进行排序?
- python - 如何保存对 django 模型的 pdf 响应?
- azure - 带有转义引号的 ARM 部署多行字符串
- google-ad-manager - 数组中的 Google 发布商代码
- c# - 如何从 protobuf-net 传输的数据中得到正确的结果