java - Spring Batch - 分块和多线程步骤 - RowMapper 中的 Nullpointer 异常
问题描述
当我在多个线程中运行我的步骤时,我在我的行映射器中得到一个空指针异常,同时处理结果集,即使对于具有显式空检查的条目也是如此。taskExecutor()
当我在没有/on 单线程的情况下执行它时工作正常。我对几件事感到困惑。我的理解是,如果我将提交间隔指定为 100,核心线程计数为 10,则每个线程都会拉出 100 个块并独立处理它。
- 分块-读取器-行映射器三重奏如何工作?如果我的阅读器中有一个查询获取 100 万行和 1000 块大小,这是否意味着阅读器将访问数据库 1000 次?并且在每次行映射器将映射所有获取的 1000 行之后? 线程如何影响行映射器?
代码如下:
@Bean
public Step myStep() {
return stepBuilderFactory.get(STEP_NAME).<MyModel, MyModel> chunk(1000)
.reader(myModelReader())
.writer(myModelWriter())
.taskExecutor(taskExecutor())
.listener(stepExecutionNotificationListener)
.listener(chunkExecutionListener)
.build();
}
@Bean
public Job myJob() {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.listener(jobCompletionNotificationListener)
.flow(myStep()).end().build();
}
@Bean
@StepScope
public JdbcCursorItemReader<MyModel> myModelReader(){
JdbcCursorItemReader<MyModel> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("my query fetching millions of records joining multiple tables from the db");
reader.setRowMapper(new MyModelRowMapper());
return reader;
}
public class MyModelRowMapperimplements RowMapper<MyModel>{
@Override
public MyModel mapRow(ResultSet rs, int rowNum) throws SQLException {
MyModel myModel = new MyModel();
myModel.setEmailAddress(checkIsEmpty(rs.getString("EMAIL_ADDRESS")) ? "" : rs.getString("EMAIL_ADDRESS").replace("|", "")); // ----- The line which is failing!!! -----
return person;
}
}
public boolean checkIsEmpty(String stringToCheck)
{
if(stringToCheck==null || stringToCheck.isEmpty() || stringToCheck.equals("null"))
{
return true;
}
return false;
}
public TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(25);
threadPoolTaskExecutor.setQueueCapacity(5);
threadPoolTaskExecutor.setThreadNamePrefix("MyModelBatch-");
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}
编辑 1
除了在非线程上下文中工作之外,如果我使用一次结果集,它也可以工作。我将代码更改为
String email = rs.getString("EMAIL_ADDRESS");
myModel.setEmailAddress(checkIsEmpty(email) ? "" : email.replace("|", ""));
解决方案
推荐阅读
- mysql - 在这种情况下如何使用 SQL 的滞后功能?
- python - 如何去除power bi中Python图的顶部和底部空白?
- python - 如何使用 Robot Framework 按行和列更新 CSV 文件
- python - AttributeError:模块'scipy.interpolate'没有属性'spline'
- c++ - 带空格的字符串/字符输入
- java - Java用String实例化一个类
- java - 将 Elasticsearch JSON 响应转换为 Elasticsearch BulkResponse
- python - Smtp 问题,例如 'smtplib.SMTPConnectError: (-1, b'Microsoft Exchange IMAP4 服务已准备好。')'
- django - AWS Cognito 调用上的 Zappa Django Rest Framework API 超时
- python - 为什么链接器在任何地方都没有指定时寻找 python36_d.lib?