spring-boot - Spring批处理本地分区作业运行太慢
问题描述
我正在从 mongodb 读取 160 万条记录并写入 oracle db。这是我的工作配置。
//作业配置
@Bean
public Job dataImportForACollectionsJob() {
return jobBuilderFactory.get("dataImportForACollectionsJob")
.start(importDataForACollectionMasterStep)
.listener(listener)
.build();
}
//主步骤配置
@Bean
public Step importDataForACollectionMasterStep() {
return this.stepBuilderFactory.get("importDataForACollectionMasterStep")
.partitioner(importDataForACollectionWorkerStep.getName(),partitionerForJobParam(null))
.partitionHandler(importDataForACollectionPartitionHandler)
.step(importDataForACollectionWorkerStep)
.listener(stepListener())
.build();
}
//分区配置
@Bean
@StepScope
public Partitioner partitionerForJobParam(@Value("#{jobParameters['collectionName']}")String
collectionName) {
MongoPartitioner partitioner = new MongoPartitioner();
partitioner.setCollectionName(collectionName);
return partitioner;
}
//工作步骤配置
@Bean
public Step importDataForACollectionWorkerStep() {
return this.stepBuilderFactory.get("importDataForACollectionWorkerStep")
.<Document, DataRecovery>chunk(10)
.reader(mongoDbItemReader(null,null,null))
.processor(batchItemProcessorFromJobParam(null))
.writer(itemWriter())
.build();
}
//阅读器配置
@Bean
@StepScope
public MongoItemReader<Document> mongoDbItemReader(@Value("#{stepExecutionContext['gte']}") Long gte,
@Value("#{stepExecutionContext['lt']}") Long lt,
@Value("#
{stepExecutionContext['collectionName']}") String collectionName) {
MongoItemReader<Document> mongoItemReader = new MongoItemReader<>();
List<Object> parameterList=new ArrayList<Object>();
parameterList.add(gte);
parameterList.add(lt);
mongoItemReader.setTemplate(mongoTemplate);
mongoItemReader.setCollection(collectionName);
mongoItemReader.setQuery("{id:{$gt:?0, $lte:?1}}");
mongoItemReader.setParameterValues(parameterList);
mongoItemReader.setTargetType(Document.class);
mongoItemReader.setQuery(new Query().limit(5));
mongoItemReader.setSort(new HashMap<String, Sort.Direction>() {{
put("_id", Sort.Direction.DESC);
}});
mongoItemReader.setPageSize(5);
return mongoItemReader;
}
//编写器配置
@Bean
public JdbcBatchItemWriter<WR3DataRecovery> itemWriter() {
JdbcBatchItemWriter<WR3DataRecovery> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new
BeanPropertyItemSqlParameterSourceProvider<DataRecovery>());
jdbcBatchItemWriter.setSql("INSERT INTO
TABLE(ID,APPID,AP,FY,OBJECT_NAME,OBJECT_SEQ_ID,"
+"SCOPE_NAME,TEMPLATE_NAME,OBJECT_INFO,ACTIVE,IS_PRIMARY,CREATED_BY,CREATED_DATE,LAST_UPDATED_BY,LAST_UPDATED_DATE)" +
" VALUES(ECORERPTMD_CORE.hibernate_sequence.nextval,:appId,:aP,:fY,:objectName,:objectSeqId,:scopeName,:templateName," +
":objectInfo,:active,:isPrimary,:createdBy,:createdDate,:lastUpdatedBy,:lastUpdatedDate)");
jdbcBatchItemWriter.setDataSource(dataSource);
return jdbcBatchItemWriter;
}
// 分区处理程序配置
@Bean
public PartitionHandler importDataForACollectionPartitionHandler() {
return taskExecutorPartitionHandler(importDataForACollectionWorkerStep);
}
private TaskExecutorPartitionHandler taskExecutorPartitionHandler(Step step){
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(threadPoolTaskExecutor());
retVal.setStep(step);
retVal.setGridSize(10);
return retVal;
}
private ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.setThreadNamePrefix("threadPoolTaskExecutor-");
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
//分区逻辑
Long partitionCount = count/gridSize;
Long gte = 0L;
Long lt = 0L;
for(int j = 0; j < gridSize; j++) {
if(j == gridSize-1) {
lt = count;
}else{
lt += partitionCount;
}
result.put("partition"+j, createExecutionContext("partition"+j,
gte.toString(), lt.toString(),getCollectionName()));
gte+= partitionCount;
}
//作业仓库配置
@Configuration
public class JobMetaDataLocalConfig extends DefaultBatchConfigurer {
@Override
@Autowired
public void setDataSource(DataSource dataSource) {
}
}
我在我的本地 Windows 10 机器上运行。以下是我的分区执行统计信息。
步骤:[importDataForACollectionWorkerStep:partition8] 执行时间为 21s965ms
步骤:[importDataForACollectionWorkerStep:partition5] 执行时间为 10s493ms
Step: [importDataForACollectionWorkerStep:partition2] 执行时间为 2m7s236ms
Step: [importDataForACollectionWorkerStep:partition4] 执行时间为 4m0s52ms
步骤:[importDataForACollectionWorkerStep:partition7] 执行时间为 18m36s676ms
步骤:[importDataForACollectionWorkerStep:partition6] 执行时间为 15h48m39s976ms
//分区日志
partition0:{partition=partition0,lt=316,gte=0,collectionName=GenericDataDomain}
partition1:{partition=partition1,lt=632,gte=316,collectionName=GenericDataDomain}
partition2:{partition=partition2,lt=948,gte=632,collectionName=GenericDataDomain}
partition3:{partition=partition3,lt=1264,gte=948,collectionName=GenericDataDomain}
partition4:{partition=partition4,lt=1580,gte=1264,collectionName=GenericDataDomain}
partition5:{partition=partition5,lt=1896,gte=1580,collectionName=GenericDataDomain}
partition6:{partition=partition6,lt=2212,gte=1896,collectionName=GenericDataDomain}
partition7:{partition=partition7,lt=2528,gte=2212,collectionName=GenericDataDomain}
partition8:{partition=partition8,lt=2844,gte=2528,collectionName=GenericDataDomain}
partition9:{partition=partition9,lt=3162,gte=2844,collectionName=GenericDataDomain}
如果你看上面的分区步骤执行,最后一个步骤 partition6 已经花费了 15 小时,剩余的步骤分区仍在运行。为什么 step partition6 花了 15hrs 其他人花了更少的时间?下面的spring批处理本地分区作业配置有什么问题?
感谢您的投入。
解决方案
批次因素可能会导致这种延迟,以下几点将有助于识别问题
- 操作系统窗口。如果您在 Linux 中尝试,您将看到真正的基准测试。
- 尝试使用 Ken 提到的单个分区来缩小延迟。
- Windows 和内存的 CPU 配置。
- 读取所选分区的任何延迟。
- 数据源连接池配置。
如果仍然无法识别 github 中的共享代码,我们可以使用 Mongo 和 Oracle 进行验证。
推荐阅读
- javascript - 将十六进制转换为 BigInt 节点红色
- javascript - 使用 Jquery 在页面上定位第一次和第二次出现的 Div
- qt - 在 Qt 中的 Table 中显示文件夹中的数据条目(xml)
- c++ - 仅计算“核心”QImage 数据(不包括元数据)的 QCryptographicHash
- c# - 使用带有空 Generic.List 作为参数的 AddRange 时 Generic.List 引发的参数异常
- symfony - 如何修复:错误:u0_.id 列不存在
- c# - 使用 EnableRetryOnFailure 时瞬时失败后的错误 SQL 结果
- oracle - 限制最初在选择列表中返回的行
- apache-nifi - 无法在 Apache-NiFi 中提取 Json 数据
- ios - MapKit:两个注释之间未显示路线