首页 > 解决方案 > Spring Kafka Listener - 事务无法正常工作

问题描述

我正在运行以 spring Kafka 和 MySQL 作为数据库的最新版本的 spring boot 应用程序,并使用 KafkaChainedTransactionManager 进行事务同步。

我想通过特定的侦听器更新相同的实体对象。因此,当多条消息一次进入主题以更新同一个对象时,事务不会等到事务提交同一个对象。所以数据变得不一致。

我尝试过使用悲观锁定 JPA 存储库查询,但没有成功。

当多个消息同时接收以更新同一对象时发生数据不一致的侦听器

// method 
@Autowired
private ProcessIndexRepository  processIndexRepository;

@Transactional(readonly=false)
@KafkaListener(id = "update_process", topics = "update_process")
 public update(@Payload String message){

ProcessModel processModel= JsonUtil.toObject(message,ProcessModel.class);
// from repository it will get old data instead of updated data 
 ProcessIndex process= processIndexRepository.findByProcessId(processModel.getId()).get();

   if(processModel.getName()!=null){
      process.set(processModel.getName())
}
if(processModel.getAge()>0){
     processModel.setAge(processModel.getAge())
}

processIndexRepository.save(process);

}

卡夫卡发件人配置

@Configuration
@EnableKafka
public class KafkaSenderConfig{

    @Value("${kafka.servers}")
    private String kafkaServers;

    @Value("${application.name}")
    private String applicationName; 

    @Bean(value = "stringKafkaTransactionManager")
    public KafkaTransactionManager<String, String> kafkaStringTransactionManager() {
        KafkaTransactionManager<String, String> ktm = new KafkaTransactionManager<String, String>(stringProducerFactory());
        ktm.setNestedTransactionAllowed(true);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
        return ktm;
    }
    @Bean(value = "stringProducerFactory")
    @Primary
    public ProducerFactory<String, String> stringProducerFactory() {
        Map<String, Object> config = new ConcurrentHashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);   
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
        defaultKafkaProducerFactory.setTransactionIdPrefix("sample-trans-");
         return defaultKafkaProducerFactory;
    }
    @Bean(value = "stringKafkaTemplate")
    @Primary
    public KafkaTemplate<String, String> stringKafkaTemplate() {
        return new KafkaTemplate<>(stringProducerFactory(),true);
    }
    @Bean(name = "chainedStringKafkaTransactionManager")
    @Primary
    public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(JpaTransactionManager jpaTransactionManager, DataSourceTransactionManager dsTransactionManager) {
        return new ChainedKafkaTransactionManager<>(kafkaStringTransactionManager(), jpaTransactionManager, dsTransactionManager);
    }    
}

卡夫卡接收器配置

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    @Value("${kafka.servers}")
    private String kafkaServers;

    @Value("${kafka.groupId}")
    private String groupId;

    @Value("${kafka.retry.maxAttempts}")
    private Integer retryMaxAttempts;

    @Value("${kafka.retry.interval}")
    private Long retryInterval;

    @Value("${kafka.concurrency}")
    private Integer concurrency;

    @Value("${kafka.poll.timeout}")
    private Integer pollTimeout;

    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }
    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }

    @Bean
    public RetryTemplate retryTemplate(){
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        return props;
    }

    @Bean(name = { "jsonConsumerFactory" })
    public ConsumerFactory<String, Object> jsonConsumerFactory() {
        Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean(name = { "kafkaJsonListenerContainerFactory" })
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaJsonListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }

数据源配置

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.sample.entity.repository")
public class DatasourceConfig {


    @Bean(name = "dataSourceProperties")
    @ConfigurationProperties("spring.datasource")
    public DataSourceProperties dataSourceProperties() {
        return new DataSourceProperties();
    }   
    @Bean(name = "datasource")
    @Primary
    public DataSource dataSource(@Qualifier("dataSourceProperties") DataSourceProperties properties) {
        return properties.initializeDataSourceBuilder().type(HikariDataSource.class)
                .build();
    }
    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(@Qualifier("datasource") DataSource ds) throws PropertyVetoException {
        LocalContainerEntityManagerFactoryBean entityManagerFactory = new LocalContainerEntityManagerFactoryBean();
        entityManagerFactory.setDataSource(ds);
        entityManagerFactory.setPackagesToScan(new String[]{"com.sample.entity.domain"});
        JpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
        entityManagerFactory.setJpaVendorAdapter(jpaVendorAdapter);
        return entityManagerFactory;
    }
    @Bean
    public DataSourceTransactionManager dsTransactionManager(@Qualifier("datasource") DataSource ds) {
        return new DataSourceTransactionManager(ds);
    }

    @Bean
    public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
        return jpaTransactionManager(entityManagerFactory);
    }

    @Bean
    public JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory){
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory);
        return transactionManager;
    }

    @Bean
    public JdbcTemplate jdbcTemplate(@Qualifier("datasource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }    
}

属性文件(Kafka 配置)

kafka.servers=localhost:9092
kafka.groupId=xyzabc
kafka.retry.maxAttempts=3
kafka.retry.interval=300000
kafka.concurrency=10
kafka.poll.timeout=1000

当更新同一个对象时,它应该等到事务提交,然后只有 JPA 存储库应该选择更新的对象并更新更新的东西

标签: spring-bootspring-data-jpaspring-kafka

解决方案


推荐阅读