spring-boot - Spring Kafka Listener - 事务无法正常工作
问题描述
我正在运行以 spring Kafka 和 MySQL 作为数据库的最新版本的 spring boot 应用程序,并使用 KafkaChainedTransactionManager 进行事务同步。
我想通过特定的侦听器更新相同的实体对象。因此,当多条消息一次进入主题以更新同一个对象时,事务不会等到事务提交同一个对象。所以数据变得不一致。
我尝试过使用悲观锁定 JPA 存储库查询,但没有成功。
当多个消息同时接收以更新同一对象时发生数据不一致的侦听器
// method
@Autowired
private ProcessIndexRepository processIndexRepository;
@Transactional(readonly=false)
@KafkaListener(id = "update_process", topics = "update_process")
public update(@Payload String message){
ProcessModel processModel= JsonUtil.toObject(message,ProcessModel.class);
// from repository it will get old data instead of updated data
ProcessIndex process= processIndexRepository.findByProcessId(processModel.getId()).get();
if(processModel.getName()!=null){
process.set(processModel.getName())
}
if(processModel.getAge()>0){
processModel.setAge(processModel.getAge())
}
processIndexRepository.save(process);
}
卡夫卡发件人配置
@Configuration
@EnableKafka
public class KafkaSenderConfig{
@Value("${kafka.servers}")
private String kafkaServers;
@Value("${application.name}")
private String applicationName;
@Bean(value = "stringKafkaTransactionManager")
public KafkaTransactionManager<String, String> kafkaStringTransactionManager() {
KafkaTransactionManager<String, String> ktm = new KafkaTransactionManager<String, String>(stringProducerFactory());
ktm.setNestedTransactionAllowed(true);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return ktm;
}
@Bean(value = "stringProducerFactory")
@Primary
public ProducerFactory<String, String> stringProducerFactory() {
Map<String, Object> config = new ConcurrentHashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.ACKS_CONFIG, "all");
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
defaultKafkaProducerFactory.setTransactionIdPrefix("sample-trans-");
return defaultKafkaProducerFactory;
}
@Bean(value = "stringKafkaTemplate")
@Primary
public KafkaTemplate<String, String> stringKafkaTemplate() {
return new KafkaTemplate<>(stringProducerFactory(),true);
}
@Bean(name = "chainedStringKafkaTransactionManager")
@Primary
public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(JpaTransactionManager jpaTransactionManager, DataSourceTransactionManager dsTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaStringTransactionManager(), jpaTransactionManager, dsTransactionManager);
}
}
卡夫卡接收器配置
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
@Value("${kafka.servers}")
private String kafkaServers;
@Value("${kafka.groupId}")
private String groupId;
@Value("${kafka.retry.maxAttempts}")
private Integer retryMaxAttempts;
@Value("${kafka.retry.interval}")
private Long retryInterval;
@Value("${kafka.concurrency}")
private Integer concurrency;
@Value("${kafka.poll.timeout}")
private Integer pollTimeout;
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(transactionManager);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return props;
}
@Bean(name = { "jsonConsumerFactory" })
public ConsumerFactory<String, Object> jsonConsumerFactory() {
Map<String, Object> props = new ConcurrentHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name = { "kafkaJsonListenerContainerFactory" })
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
数据源配置
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.sample.entity.repository")
public class DatasourceConfig {
@Bean(name = "dataSourceProperties")
@ConfigurationProperties("spring.datasource")
public DataSourceProperties dataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "datasource")
@Primary
public DataSource dataSource(@Qualifier("dataSourceProperties") DataSourceProperties properties) {
return properties.initializeDataSourceBuilder().type(HikariDataSource.class)
.build();
}
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory(@Qualifier("datasource") DataSource ds) throws PropertyVetoException {
LocalContainerEntityManagerFactoryBean entityManagerFactory = new LocalContainerEntityManagerFactoryBean();
entityManagerFactory.setDataSource(ds);
entityManagerFactory.setPackagesToScan(new String[]{"com.sample.entity.domain"});
JpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
entityManagerFactory.setJpaVendorAdapter(jpaVendorAdapter);
return entityManagerFactory;
}
@Bean
public DataSourceTransactionManager dsTransactionManager(@Qualifier("datasource") DataSource ds) {
return new DataSourceTransactionManager(ds);
}
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
return jpaTransactionManager(entityManagerFactory);
}
@Bean
public JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory){
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory);
return transactionManager;
}
@Bean
public JdbcTemplate jdbcTemplate(@Qualifier("datasource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
属性文件(Kafka 配置)
kafka.servers=localhost:9092
kafka.groupId=xyzabc
kafka.retry.maxAttempts=3
kafka.retry.interval=300000
kafka.concurrency=10
kafka.poll.timeout=1000
当更新同一个对象时,它应该等到事务提交,然后只有 JPA 存储库应该选择更新的对象并更新更新的东西
解决方案
推荐阅读
- ios - Swift 中 AVAudioPlayer 的这个 URL 有什么问题?
- typescript - 根据 TypeScript 中的参数定义字典中函数的返回类型
- rust - 在 bin crate 中链接 rlib 文件
- excel - Excel:在范围内查找特定(文本)值
- postgresql - 如何根据 postgres 中的 lat long 和 duration 来检查用户是在旅行还是在某个未知位置等待
- html - puppeter-sharp 中第一页/最后一页/奇数页/偶数页上的不同页眉/页脚
- python - 将列标题更改为行中的值
- docker - Docker & gitlab-ci:在 Dockerfile 中构建应用程序,同时创建工件
- javascript - 条码扫描器输入上的 DOM 重新加载
- python - 为什么索引列表比分配新列表慢?