java - Spring Batch - ItemWriter 正在写入 ItemReader 读取的相同对象,但不是通过 ItemProcessor 处理后返回的对象
问题描述
我的情况是:JdbcPagingItemReader
正在从 Oracle 数据库读取并返回对象让我们说“员工”。然后这个“ Employee
”对象被传递给处理器,以再次调用 db 以从多个表中提取更多信息并返回“ AggregatedEmployee
”对象(它实际上扩展了 Employee)。我正在使用KafkaItemWriter
将处理后的对象写入 Kafka,但不是在写入AggregatedEmployee
,而是 writer 正在尝试编写“员工”本身。
@Mahmoud Ben Hassine:我看到了你对 Spring Batch 的很多建议。请分享你的想法。
处理器接口代码:
public interface PageProcessor<T> {
<R extends Employee> R process(T page);
}
步骤豆代码:
@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5).
reader(reader).
writer(writer).build();
}
ProcessorInterface代码的实现类:
public class EmployeeProcessor implements PageProcessor<Employee> {
private NamedParameterJdbcTemplate jdbcTemplate;
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
@SuppressWarnings("unchecked")
@Override
public <R extends Employee> R process(Employee page) {
... implementation goes here
}
KafkaItemWriter 豆:
@Bean
KafkaItemWriter<String,AggregatedEmployee> writer(){
return new KafkaItemWriterBuilder<String, AggregatedEmployee>()
.kafkaTemplate(aggregatedEmployeekafkaTemplate)
.itemKeyMapper(aggregatedEmployee -> String.valueOf(aggregatedEmployee.getEmployeeId()))
.build();
}
编辑显示处理器:
public class CompositeJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
private PageProcessor<T> pageProcessor;
public void setPageProcessor(PageProcessor<T> pageProcessor) {
this.pageProcessor = pageProcessor;
}
并且当 Reader bean 被创建时,处理器对象也被创建并通过上面显示的 setter 设置到 reader 中,并且在 EmployeeProcessor 中编写的处理器逻辑也被执行。
错误:
java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader 'app')
at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)
解决方案
您没有在您的步骤上设置处理器:
@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5).
reader(reader).
writer(writer).build();
}
您需要设置将在您的步骤中进行类型转换的处理器Employee
-> 。AggregatedEmployee
推荐阅读
- java - 我无法删除 mysql 表中的所有行
- javascript - 我想停止执行,直到 this.conn.getUserByAlternateNumber 完成工作并返回一个值
- php - post方法中的Php搜索表单不返回数据
- java - pg_advisory_lock 有效,但 pg_try_advisory_lock 无效
- php - 使用 Laravel 的 store() 存储使用 imagecopy() 创建的图像
- laravel - Laravel 在保存时自动更新 Eloquent 字段
- php - 从容器到主机的 Postgres 数据路径映射 - 主机上的数据目录为空
- php - --model 选项不存在
- javascript - 如何使用 jQuery 或 JavaScript 实时更改值?
- python - 为什么 tensorflow.map_fn 慢,下面的代码有什么问题?