首页 > 解决方案 > Spring Batch:KafkaItemReader 仅在消费者作业在生产者作业之前启动并且指定正确分区时才有效

问题描述

我有一个简单的 Spring Batch 作业,它从具有 3 个分区的 Kafka 主题中读取。我有以下意见/问题:

  1. 如果消费者作业消息发布到主题后启动,消费者作业将无限等待消息。消费者仅在首先启动然后向主题生成消息时才消费消息。在现实世界中,我不能等待消息发布然后开始消费者工作。我该如何解决这个问题?

  2. 我的主题有 4 个分区,但只有当我向读者提供分区 0、1 和 2 时,消费者才有效。如果我也提供了分区 3,消费者 waitis infinitley 和 somtimes 也会抛出以下异常:

    Exception is......................... : 
    org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms 
    expired before successfully committing the current consumed offsets
    org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms 
    expired before successfully committing the current consumed offsets
    

消费者作业配置:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer())
                .start(testFileWritingStep()).build();
    }

    @Bean
    public Step testFileWritingStep() {
        return stepBuilderFactory.get("testFileWritingStep").<String, String>chunk(10)
                .reader(testKafkaItemReader()).writer(testFileWriter()).build();
    }

    @Bean
    public KafkaItemReader<String, String> testKafkaItemReader() {
        Properties props = new Properties();
        //not providing the actual broker hosts and ports on stackoverfow for security reasons..
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "somebroker:someport,somebroker:someport,somebroker:someport,somebroker:someport");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src/main/resources/conf/trust.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "pass");
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "src/main/resources/conf/loc.jks");
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "pass");
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "pass");

            
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        return new KafkaItemReaderBuilder<String, String>().partitions(0,1,2).consumerProperties(props)
                .name("myreader").saveState(true).topic("mytopic").build();
    }

    @Bean
    public FlatFileItemWriter<String> testFileWriter() {
        FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource(
                "I:/CK/data/output.dat"));
        writer.setAppendAllowed(false);
        writer.setShouldDeleteIfExists(true);
        DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(",");
        writer.setLineAggregator(lineAggregator);
        return writer;
    }
}

标签: javaapache-kafkaspring-batchconsumer

解决方案


推荐阅读