java - Spring:带有 Jpa 和 Kafka 的 ChainedKafkaTransactionManager 不是原子的?
问题描述
我们的基础架构必须管理 2 个 Spring Boot 应用程序之间的信息交换。一些信息基于 Oracle DB。我们使用 Kafka 来通知哪些信息必须由接收者管理。
(注意:我必须概括代码)
我们有 Kafka 生产者,配置如下:
@Configuration
@Lazy(false)
public class KafkaConfig {
//...
@Bean(name="trManagerJpaKafka")
public ChainedKafkaTransactionManager<Object, Object> chainedTm(KafkaTransactionManager<String, String> ktm,
JpaTransactionManager jpaTransactionManager) {
return new ChainedKafkaTransactionManager<>(jpaTransactionManager,ktm);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(KafkaOperations<?, ?> template,
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory, ChainedKafkaTransactionManager<Object, Object> chainedTm) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//...
factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer,new FixedBackOff(0L, 2)));
factory.getContainerProperties().setTransactionManager(chainedTm);
return factory;
}
}
然后我们有一些服务产生 Kafka 消息,像这样
public class SomeService implements ISomeService {
@Override
@Transactional(transactionManager = "trManagerJpaKafka" , rollbackFor = { Exception.class })
public boolean createMessage(Long id, CreateRDM createRDM) {
...
Entity entity = new Entity();
entity.setSomething(something);
entity.setSomethingElse(somethingElse);
entity = entityRepository.save(entity);
JSONObject message = new JSONObject();
try {
message.put("id", entity.getId());
kafkaProducerService.sendMessage(message.toString());
...
}
...
}
public class SomeGenericKafkaService implements ISomeGenericKafkaService {
...
public void sendMessage(String data) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(KafkaHeaders.TOPIC, someTopic);
headers.put(KafkaHeaders.MESSAGE_KEY, UUID.randomUUID().toString());
KafkaProducerCallback callback = new KafkaProducerCallback();
kafkaTemplate.send(new GenericMessage<String>(data, headers)).addCallback(callback);
if (callback.isError()) {
throw new KafkaException(callback.getThrowable());
}
}
...
}
在消费者 Spring Boot 应用程序中,我们有时会遇到这个问题:它使用来自 Kafka 的消息,但有时 DB 上的记录不存在,所以当我们尝试访问 DB 记录时出现异常......生产者中的事务不是原子的Jpa和卡夫卡?在Jpa插入之前提交Kafka消息?
同时,作为紧急解决方案,为了管理从 Oracle 检索数据的异常,我们在接收器中放置了一些重试,类似于他的:
...
Optional<Entity> optionalEntity = entityRepository.findById(id);
if (!optionalEntity.isPresent()) {
for (int i = 0; i < kafkaDbMaxRetry; i++) {
Thread.sleep(kafkaMessageDelay);
optionalEntity = entityRepository.findById(id);
if (optionalEntity.isPresent()) break;
}
}
entity = optionalEntity.get();
...
如何调整生产者并避免消费者出现这种糟糕的代码?
解决方案
链式事务管理器仅提供“Best Effort 1 Phase Commit”——Kafka 不能参与 JTA/XA 事务;用这些不同的技术提供原子更新是不可能的。
您必须使您的侦听器具有幂等性 - 一种常见的模式是将主题/分区/偏移量与数据一起存储,以便您可以检查它是否已被处理。
推荐阅读
- c# - 在 .net core 3.0 上运行集成服务 SSIS
- spring-boot - Spring Boot 应用程序不会将日志写入 Wildfly 14 上的日志文件
- javascript - javascript中有没有办法避免在逻辑表达式中重复相同的变量?
- asp.net-core-mvc - 将多个 Get 参数传递给 WebApi
- c# - 如何将地标定位在 xamarin 表单图像视图上?
- android - 如何(正确)使用导航组件从前台服务打开 Activity/Fragment
- ios - 检查是否通过取消按钮取消了 UIAlertController 操作表?
- system-verilog - 如何显示来自 Modelsim / Synopsys 模拟器的 Verilog 力列表?
- python - Pandas Dataframe df.at KeyError
- vhdl - 如何在 VHDL 中添加 LUT 以生成正弦波