首页 > 解决方案 > Spring Batch - ItemWriter 正在写入 ItemReader 读取的相同对象,但不是通过 ItemProcessor 处理后返回的对象

问题描述

我的情况是:JdbcPagingItemReader正在从 Oracle 数据库读取并返回对象让我们说“员工”。然后这个“ Employee”对象被传递给处理器,以再次调用 db 以从多个表中提取更多信息并返回“ AggregatedEmployee”对象(它实际上扩展了 Employee)。我正在使用KafkaItemWriter将处理后的对象写入 Kafka,但不是在写入AggregatedEmployee,而是 writer 正在尝试编写“员工”本身。

@Mahmoud Ben Hassine:我看到了你对 Spring Batch 的很多建议。请分享你的想法。

处理器接口代码:

public interface PageProcessor<T> {
       <R extends Employee> R process(T page);
}

步骤豆代码:

@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
    return steps.get("step1")
                .<Employee, AggregatedEmployee>chunk(5).
                reader(reader).
                writer(writer).build();
    }

ProcessorInterface代码的实现类:

public class EmployeeProcessor implements PageProcessor<Employee> {
    private NamedParameterJdbcTemplate jdbcTemplate;

    public void setDataSource(DataSource dataSource) {
        jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <R extends Employee> R process(Employee page) {
             ... implementation goes here
    }

KafkaItemWriter 豆:

    @Bean
    KafkaItemWriter<String,AggregatedEmployee> writer(){
        return new KafkaItemWriterBuilder<String, AggregatedEmployee>()
                .kafkaTemplate(aggregatedEmployeekafkaTemplate)
                .itemKeyMapper(aggregatedEmployee -> String.valueOf(aggregatedEmployee.getEmployeeId()))
                .build();
    }

编辑显示处理器:

public class CompositeJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
    private PageProcessor<T> pageProcessor;

    public void setPageProcessor(PageProcessor<T> pageProcessor) {
        this.pageProcessor = pageProcessor;
    }

并且当 Reader bean 被创建时,处理器对象也被创建并通过上面显示的 setter 设置到 reader 中,并且在 EmployeeProcessor 中编写的处理器逻辑也被执行。

错误:

java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader 'app')
    at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)

标签: javaspringspring-bootapache-kafkaspring-batch

解决方案


您没有在您的步骤上设置处理器:

@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
return steps.get("step1")
            .<Employee, AggregatedEmployee>chunk(5).
            reader(reader).
            writer(writer).build();
}

您需要设置将在您的步骤中进行类型转换的处理器Employee-> 。AggregatedEmployee


推荐阅读