首页 > 解决方案 > Spring Batch - 分块和多线程步骤 - RowMapper 中的 Nullpointer 异常

问题描述

当我在多个线程中运行我的步骤时,我在我的行映射器中得到一个空指针异常,同时处理结果集,即使对于具有显式空检查的条目也是如此。taskExecutor()当我在没有/on 单线程的情况下执行它时工作正常。我对几件事感到困惑。我的理解是,如果我将提交间隔指定为 100,核心线程计数为 10,则每个线程都会拉出 100 个块并独立处理它。

代码如下:

@Bean
public Step myStep() {
    return stepBuilderFactory.get(STEP_NAME).<MyModel, MyModel> chunk(1000)
            .reader(myModelReader())
            .writer(myModelWriter())
            .taskExecutor(taskExecutor())
            .listener(stepExecutionNotificationListener)
            .listener(chunkExecutionListener)
            .build();

}

@Bean
public Job myJob() {
    return jobBuilderFactory.get(JOB_NAME)
            .incrementer(new RunIdIncrementer())
            .listener(jobCompletionNotificationListener)
            .flow(myStep()).end().build();

}

@Bean
@StepScope
public JdbcCursorItemReader<MyModel> myModelReader(){
    JdbcCursorItemReader<MyModel> reader = new JdbcCursorItemReader<>();
    reader.setDataSource(dataSource);
    reader.setVerifyCursorPosition(false);
    reader.setSql("my query fetching millions of records joining multiple tables from the db");
    reader.setRowMapper(new MyModelRowMapper());

    return reader;
}

public class MyModelRowMapperimplements RowMapper<MyModel>{

    @Override
    public MyModel mapRow(ResultSet rs, int rowNum) throws SQLException {
      MyModel myModel = new MyModel();
      myModel.setEmailAddress(checkIsEmpty(rs.getString("EMAIL_ADDRESS")) ? "" : rs.getString("EMAIL_ADDRESS").replace("|", "")); // ----- The line which is failing!!! -----
      return person;             
    }

}
public boolean checkIsEmpty(String stringToCheck)
{
    if(stringToCheck==null || stringToCheck.isEmpty() || stringToCheck.equals("null"))
    {
        return true;
    }
    return false;
}
public TaskExecutor taskExecutor(){
    ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(10);
    threadPoolTaskExecutor.setMaxPoolSize(25);
    threadPoolTaskExecutor.setQueueCapacity(5);
    threadPoolTaskExecutor.setThreadNamePrefix("MyModelBatch-");
    threadPoolTaskExecutor.afterPropertiesSet();
    return threadPoolTaskExecutor;
}

编辑 1

除了在非线程上下文中工作之外,如果我使用一次结果集,它也可以工作。我将代码更改为

String email = rs.getString("EMAIL_ADDRESS");
myModel.setEmailAddress(checkIsEmpty(email) ? "" : email.replace("|", ""));

标签: javaspringmultithreadingspring-bootspring-batch

解决方案


JdbcCursorItemReader不是线程安全的(请参阅它的javadoc和此答案中的更多详细信息)。这样做的原因是它包装了一个ResultSet不是线程安全的单个。

因此,您的问题是由于在多线程步骤中使用了非线程安全的项目阅读器。根据Javadoc:

每次调用read()都会调用提供的 RowMapper,并传入 ResultSet。

由于read不同步,每个线程都可以调用它来读取项目。

要解决您的问题,您可以将 Jdbc 阅读器包装在SynchronizedItemStreamReader.


推荐阅读