spring - 春季批量向Kafka写入数据
问题描述
Kafka 的新手,最近我正在尝试从 Spring Batch 中获取数据,然后写入 Kafka,但我不知道该怎么做。有人可以帮我弄清楚如何将数据写入卡夫卡吗?这是我用 SpringBatch 编写的用于获取数据的演示代码:
@Configuration 公共类 FileReader {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileWriter")
private ItemWriter<? super Demo1> flatFileWriter;
@Bean
public Job FileReaderJob() {
return jobBuilderFactory.get("FileReaderJob").start(FileReaderStep()).build();
}
private Step FileReaderStep() {
// TODO Auto-generated method stub
return stepBuilderFactory.get("FileReaderStep").<Demo1,Demo1>chunk(100).reader(flatFileReader())
.writer(flatFileWriter).build();
}
@Bean
@StepScope
public FlatFileItemReader<Demo1> flatFileReader() {
// TODO Auto-generated method stub
FlatFileItemReader<Demo1> reader = new FlatFileItemReader<Demo1>();
reader.setResource(new ClassPathResource("Demo1.csv"));
reader.setLinesToSkip(1);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String [] {"id","first","last"});
DefaultLineMapper<Demo1> mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper<Demo1>() {
@Override
public Demo1 mapFieldSet(FieldSet fieldSet) throws BindException {
Demo1 demo1 = new Demo1();
demo1.setId(fieldSet.readLong("id"));
demo1.setFirst(fieldSet.readString("first"));
demo1.setLast(fieldSet.readString("last"));
// TODO Auto-generated method stub
return demo1;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
}
解决方案
即将推出的 Spring Batch v4.2 GA 将支持读取/写入数据到 Apache Kafka 主题。您已经可以在4.2.0.RC1 版本中试用了。
对于KafkaItemWriter
,您将需要配置一个KafkaTemplate
. 下面是作者的一个例子:
@Bean
public KafkaItemWriter<String, Demo1> kafkaItemWriter(KafkaTemplate<String, Demo1> kafkaTemplate) {
return new KafkaItemWriterBuilder<String, Demo1>()
.kafkaTemplate(kafkaTemplate)
.build();
}
您还可以查看Josh Long 在 Spring Batch 中关于 Kafka 支持的Spring Tips 一期。
推荐阅读
- django - Python manage.py runserver 不适用于我 PC 中的新项目或现有项目
- python - 使用 LSTM Keras 的验证结果不佳
- java - How to set nth page's permission and sub permission in hibernate?
- c# - 正确使用 QueueBackgroundWorkItem?
- python-3.x - 控制图形的边缘属性 python igraph
- amazon-dynamodb - 使用 DynamoDB 扫描的 API 超时(1000 条记录,13.25mb 大小)
- selenium - 如何通过 Selenium 使用 xpath 定位元素
- java - 为什么我不能将 Function.identity 称为收集器中的方法引用
- java - a different design pattern for this solution
- linux - 如何在最新的 apache netbeans 11.2 linux 中添加 jar 文件?