java - Spring Batch:KafkaItemReader 仅在消费者作业在生产者作业之前启动并且指定正确分区时才有效
问题描述
我有一个简单的 Spring Batch 作业,它从具有 3 个分区的 Kafka 主题中读取。我有以下意见/问题:
如果消费者作业在消息发布到主题后启动,消费者作业将无限等待消息。消费者仅在首先启动然后向主题生成消息时才消费消息。在现实世界中,我不能等待消息发布然后开始消费者工作。我该如何解决这个问题?
我的主题有 4 个分区,但只有当我向读者提供分区 0、1 和 2 时,消费者才有效。如果我也提供了分区 3,消费者 waitis infinitley 和 somtimes 也会抛出以下异常:
Exception is......................... : org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing the current consumed offsets org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing the current consumed offsets
消费者作业配置:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer())
.start(testFileWritingStep()).build();
}
@Bean
public Step testFileWritingStep() {
return stepBuilderFactory.get("testFileWritingStep").<String, String>chunk(10)
.reader(testKafkaItemReader()).writer(testFileWriter()).build();
}
@Bean
public KafkaItemReader<String, String> testKafkaItemReader() {
Properties props = new Properties();
//not providing the actual broker hosts and ports on stackoverfow for security reasons..
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"somebroker:someport,somebroker:someport,somebroker:someport,somebroker:someport");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src/main/resources/conf/trust.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "pass");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "src/main/resources/conf/loc.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "pass");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "pass");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaItemReaderBuilder<String, String>().partitions(0,1,2).consumerProperties(props)
.name("myreader").saveState(true).topic("mytopic").build();
}
@Bean
public FlatFileItemWriter<String> testFileWriter() {
FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource(
"I:/CK/data/output.dat"));
writer.setAppendAllowed(false);
writer.setShouldDeleteIfExists(true);
DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
writer.setLineAggregator(lineAggregator);
return writer;
}
}
解决方案
推荐阅读
- c# - 在 MVVM 中正确放置属性的位置
- java - 读取 Excel 并获取嵌入对象的行号和文件扩展名
- c - 位移位 vs 数组索引,更适合 32 位 MCU 上的 usart 接口
- node.js - 将托管在 azure 上的 Web 应用程序指向 azure 上的数据库
- r - R错误:“选择”功能错误:选择(-c())
- excel - Excel 停止运行受保护的工作簿,如果解锁它运行良好
- java - 如何将依赖项注入 Jackson Custom 反序列化器
- oauth-2.0 - Ajex 无需登录 Active Directory 即可访问 Protect API
- python - 如何在 Python Cassandra Driver 客户端中禁用控制台日志记录?
- json - BigQuery 解析具有特殊字符的 json 子列