首页 > 解决方案 > Spring:带有 Jpa 和 Kafka 的 ChainedKafkaTransactionManager 不是原子的?

问题描述

我们的基础架构必须管理 2 个 Spring Boot 应用程序之间的信息交换。一些信息基于 Oracle DB。我们使用 Kafka 来通知哪些信息必须由接收者管理。

(注意:我必须概括代码)

我们有 Kafka 生产者,配置如下:

@Configuration
@Lazy(false)
public class KafkaConfig {

    //...

    @Bean(name="trManagerJpaKafka")
    public ChainedKafkaTransactionManager<Object, Object> chainedTm(KafkaTransactionManager<String, String> ktm,
            JpaTransactionManager jpaTransactionManager) {

        return new ChainedKafkaTransactionManager<>(jpaTransactionManager,ktm);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(KafkaOperations<?, ?> template,
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory, ChainedKafkaTransactionManager<Object, Object> chainedTm) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        configurer.configure(factory, kafkaConsumerFactory);
        
        //...

        factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer,new FixedBackOff(0L, 2)));
        factory.getContainerProperties().setTransactionManager(chainedTm);
        return factory;
    }

}

然后我们有一些服务产生 Kafka 消息,像这样

public class SomeService implements ISomeService {
    
    @Override
    @Transactional(transactionManager = "trManagerJpaKafka" , rollbackFor = { Exception.class })
    public boolean createMessage(Long id, CreateRDM createRDM) {

        ...
        
        Entity entity = new Entity();
        entity.setSomething(something);
        entity.setSomethingElse(somethingElse);
        
        entity = entityRepository.save(entity);

        JSONObject message = new JSONObject();
        
        try {
            message.put("id", entity.getId());
            kafkaProducerService.sendMessage(message.toString());
            
        ...
    }
    ...
}

public class SomeGenericKafkaService implements ISomeGenericKafkaService {

    ...

    public void sendMessage(String data) {
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put(KafkaHeaders.TOPIC, someTopic);
        headers.put(KafkaHeaders.MESSAGE_KEY, UUID.randomUUID().toString());
        
        KafkaProducerCallback callback = new KafkaProducerCallback();
        
        kafkaTemplate.send(new GenericMessage<String>(data, headers)).addCallback(callback);
        
        if (callback.isError()) {
            throw new KafkaException(callback.getThrowable());
        }
    }   

    ...

}

在消费者 Spring Boot 应用程序中,我们有时会遇到这个问题:它使用来自 Kafka 的消息,但有时 DB 上的记录不存在,所以当我们尝试访问 DB 记录时出现异常......生产者中的事务不是原子的Jpa和卡夫卡?在Jpa插入之前提交Kafka消息?

同时,作为紧急解决方案,为了管理从 Oracle 检索数据的异常,我们在接收器中放置了一些重试,类似于他的:

    ...
    Optional<Entity> optionalEntity = entityRepository.findById(id);
    
    if (!optionalEntity.isPresent()) {
        for (int i = 0; i < kafkaDbMaxRetry; i++) {
            Thread.sleep(kafkaMessageDelay);
            optionalEntity = entityRepository.findById(id);
            if (optionalEntity.isPresent()) break;
        }
    }
    
    entity = optionalEntity.get();
    ...

如何调整生产者并避免消费者出现这种糟糕的代码?

标签: javaspringspring-bootspring-data-jpaspring-kafka

解决方案


链式事务管理器仅提供“Best Effort 1 Phase Commit”——Kafka 不能参与 JTA/XA 事务;用这些不同的技术提供原子更新是不可能的。

有关更多信息,请参阅https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html

您必须使您的侦听器具有幂等性 - 一种常见的模式是将主题/分区/偏移量与数据一起存储,以便您可以检查它是否已被处理。


推荐阅读