spring-boot - Spring Boot中的事务同步无法正常工作
问题描述
事务同步和回滚无法正常工作。并且偶尔会给producerFencedException。我的配置或代码中是否有任何错误..?
- 我有多个弹簧靴实例
- 1个码头经纪人
- spring boot 版本:2.1.4 发布
卡夫卡发件人配置
@Configuration
@EnableKafka
public class KafkaSenderConfig{
@Value("${kafka.servers}")
private String kafkaServers;
@Value("${application.name}")
private String applicationName;
@Bean(value = "stringKafkaTransactionManager")
public KafkaTransactionManager<String, String> kafkaStringTransactionManager() {
KafkaTransactionManager<String, String> ktm = new KafkaTransactionManager<String, String>(stringProducerFactory());
ktm.setNestedTransactionAllowed(true);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return ktm;
}
@Bean(value = "stringProducerFactory")
@Primary
public ProducerFactory<String, String> stringProducerFactory() {
Map<String, Object> config = new ConcurrentHashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.ACKS_CONFIG, "all");
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
defaultKafkaProducerFactory.setTransactionIdPrefix("sample-trans-");
return defaultKafkaProducerFactory;
}
@Bean(value = "stringKafkaTemplate")
@Primary
public KafkaTemplate<String, String> stringKafkaTemplate() {
return new KafkaTemplate<>(stringProducerFactory(),true);
}
@Bean(name = "chainedStringKafkaTransactionManager")
@Primary
public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(JpaTransactionManager jpaTransactionManager, DataSourceTransactionManager dsTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaStringTransactionManager(), jpaTransactionManager, dsTransactionManager);
}
}
kafka接收器配置
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
@Value("${kafka.servers}")
private String kafkaServers;
@Value("${kafka.groupId}")
private String groupId;
@Value("${kafka.retry.maxAttempts}")
private Integer retryMaxAttempts;
@Value("${kafka.retry.interval}")
private Long retryInterval;
@Value("${kafka.concurrency}")
private Integer concurrency;
@Value("${kafka.poll.timeout}")
private Integer pollTimeout;
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(transactionManager);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return props;
}
@Bean(name = { "jsonConsumerFactory" })
public ConsumerFactory<String, Object> jsonConsumerFactory() {
Map<String, Object> props = new ConcurrentHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name = { "kafkaJsonListenerContainerFactory" })
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
数据源配置
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.sample.entity.repository")
public class DatasourceConfig {
@Bean(name = "dataSourceProperties")
@ConfigurationProperties("spring.datasource")
public DataSourceProperties dataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "datasource")
@Primary
public DataSource dataSource(@Qualifier("dataSourceProperties") DataSourceProperties properties) {
return properties.initializeDataSourceBuilder().type(HikariDataSource.class)
.build();
}
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory(@Qualifier("datasource") DataSource ds) throws PropertyVetoException {
LocalContainerEntityManagerFactoryBean entityManagerFactory = new LocalContainerEntityManagerFactoryBean();
entityManagerFactory.setDataSource(ds);
entityManagerFactory.setPackagesToScan(new String[]{"com.sample.entity.domain"});
JpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
entityManagerFactory.setJpaVendorAdapter(jpaVendorAdapter);
return entityManagerFactory;
}
@Bean
public DataSourceTransactionManager dsTransactionManager(@Qualifier("datasource") DataSource ds) {
return new DataSourceTransactionManager(ds);
}
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
return jpaTransactionManager(entityManagerFactory);
}
@Bean
public JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory){
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory);
return transactionManager;
}
@Bean
public JdbcTemplate jdbcTemplate(@Qualifier("datasource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
产生带有交易的消息:
@Autowired
@Qualifier("stringKafkaTemplate")
private KafkaTemplate<String, String> stringKafkaTemplate;
@Autowired
private EmployeeRepository employeeRepository;
@Override
@Transactional
public void create(List<Employee> employees){
for (Employee emp : employees) {
employeeRepository.save(emp);
String jsonStr = JsonUtil.toString(emp);
stringKafkaTemplate.send("employee", jsonStr);
}
}
接收者
@KafkaListener(id = "employee", topics = "employee")
@Transactional(readOnly = false)
public void processRequest(@Payload String message) throws IOException {
/// its working fine
}
属性文件(Kafka 配置)
kafka.servers=localhost:9092
kafka.groupId=xyzabc
kafka.retry.maxAttempts=3
kafka.retry.interval=300000
kafka.concurrency=10
kafka.poll.timeout=1000
解决方案
看来您的听众正在接收Employee
对象并且您的生产者正在创建它们 - 即您没有create()
从听众那里调用。
正如我昨天在对您的另一个问题的评论中所说...
如果您在侦听器容器线程上生成消息,
transactional.id
则为<prefix><group>.<topic>.<partition>
. 由于不能将分区分配给多个实例,因此transactional.id
s 将是唯一的。如果您在容器线程的上下文之外生成消息,则transactional.id
(因此前缀)在实例之间必须是唯一的。如果您两者都做,您将需要 2 个不同的生产者工厂。
@Override
@Transactional
public void create(List<Employee> employees){
for (Employee emp : employees) {
employeeRepository.save(emp);
String jsonStr = JsonUtil.toString(emp);
stringKafkaTemplate.send("employee", jsonStr);
}
}
因此,由于您的事务仅在生产者端,因此您transactionIdPrefix
需要在每个实例上都是唯一的。
推荐阅读
- c - 将 16 位正数转换为二进制的程序
- reactjs - 只是为了一劳永逸,您究竟如何更新工作存储库中的 npm 包?
- sql - 如何使用 JPQL 查询存储为 BINARY(16) 的 UUID
- db2 - DB2:是否可以从应用程序禁止密码和使用可信连接连接到 DB2
- http - 将 api 密钥注入谷歌脚本的 UrlFetchApp 以获取 HTTP 请求
- c++ - 将 2D 数组转换为 1D 数组,其元素为 2D 数组中的值的总和
- javascript - JS中的console.log是浏览器提供的Web Api的一部分吗
- javascript - 如何从 flatpickr 中获取选定的日期
- reactjs - React Native 钩子,useRef 和 useEffect
- java - 如何阻止多个意图按顺序打开?