首页 > 解决方案 > 春季批量向Kafka写入数据

问题描述

Kafka 的新手,最近我正在尝试从 Spring Batch 中获取数据,然后写入 Kafka,但我不知道该怎么做。有人可以帮我弄清楚如何将数据写入卡夫卡吗?这是我用 SpringBatch 编写的用于获取数据的演示代码:

@Configuration 公共类 FileReader {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("flatFileWriter")
private ItemWriter<? super Demo1> flatFileWriter;
@Bean
public Job FileReaderJob() {
    return jobBuilderFactory.get("FileReaderJob").start(FileReaderStep()).build();
}
private Step FileReaderStep() {
    // TODO Auto-generated method stub
    return stepBuilderFactory.get("FileReaderStep").<Demo1,Demo1>chunk(100).reader(flatFileReader())
            .writer(flatFileWriter).build();
}

@Bean
@StepScope
public FlatFileItemReader<Demo1> flatFileReader() {
    // TODO Auto-generated method stub

    FlatFileItemReader<Demo1> reader = new FlatFileItemReader<Demo1>();
    reader.setResource(new  ClassPathResource("Demo1.csv"));
    reader.setLinesToSkip(1);
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames(new String [] {"id","first","last"});
    DefaultLineMapper<Demo1> mapper = new DefaultLineMapper<>();
    mapper.setLineTokenizer(tokenizer);
    mapper.setFieldSetMapper(new FieldSetMapper<Demo1>() {

        @Override
        public Demo1 mapFieldSet(FieldSet fieldSet) throws BindException {
            Demo1 demo1 = new Demo1();
            demo1.setId(fieldSet.readLong("id"));
            demo1.setFirst(fieldSet.readString("first"));
            demo1.setLast(fieldSet.readString("last"));

            // TODO Auto-generated method stub
            return demo1;
        }
    });

    mapper.afterPropertiesSet();
    reader.setLineMapper(mapper);

    return reader;
}

}

标签: springspring-bootapache-kafkaspring-batchspring-kafka

解决方案


即将推出的 Spring Batch v4.2 GA 将支持读取/写入数据到 Apache Kafka 主题。您已经可以在4.2.0.RC1 版本中试用了。

对于KafkaItemWriter,您将需要配置一个KafkaTemplate. 下面是作者的一个例子:

@Bean
public KafkaItemWriter<String, Demo1> kafkaItemWriter(KafkaTemplate<String, Demo1> kafkaTemplate) {
    return new KafkaItemWriterBuilder<String, Demo1>()
            .kafkaTemplate(kafkaTemplate)
            .build();
}

您还可以查看Josh Long 在 Spring Batch 中关于 Kafka 支持的Spring Tips 一期。


推荐阅读