首页 > 解决方案 > Spring批处理本地分区作业运行太慢

问题描述

我正在从 mongodb 读取 160 万条记录并写入 oracle db。这是我的工作配置。

//作业配置

@Bean
public Job dataImportForACollectionsJob() {
return jobBuilderFactory.get("dataImportForACollectionsJob")
            .start(importDataForACollectionMasterStep)
            .listener(listener)
            .build();

}

//主步骤配置

@Bean
public Step importDataForACollectionMasterStep() {
    return this.stepBuilderFactory.get("importDataForACollectionMasterStep")
            .partitioner(importDataForACollectionWorkerStep.getName(),partitionerForJobParam(null))
            .partitionHandler(importDataForACollectionPartitionHandler)
            .step(importDataForACollectionWorkerStep)
            .listener(stepListener())
            .build();

}

//分区配置

@Bean
@StepScope
public Partitioner partitionerForJobParam(@Value("#{jobParameters['collectionName']}")String 
 collectionName) {
    MongoPartitioner partitioner = new MongoPartitioner();
    partitioner.setCollectionName(collectionName);
    return partitioner;
}

//工作步骤配置

@Bean
public Step importDataForACollectionWorkerStep() {
    return this.stepBuilderFactory.get("importDataForACollectionWorkerStep")
            .<Document, DataRecovery>chunk(10)
            .reader(mongoDbItemReader(null,null,null))
            .processor(batchItemProcessorFromJobParam(null))
            .writer(itemWriter())
            .build();
}

//阅读器配置

@Bean
@StepScope
public MongoItemReader<Document> mongoDbItemReader(@Value("#{stepExecutionContext['gte']}") Long gte,
                                                   @Value("#{stepExecutionContext['lt']}") Long lt,
                                                   @Value("# 
    {stepExecutionContext['collectionName']}") String collectionName) {
    MongoItemReader<Document> mongoItemReader = new MongoItemReader<>();
    List<Object> parameterList=new ArrayList<Object>();
    parameterList.add(gte);
    parameterList.add(lt);
    mongoItemReader.setTemplate(mongoTemplate);
    mongoItemReader.setCollection(collectionName);
    mongoItemReader.setQuery("{id:{$gt:?0, $lte:?1}}");
    mongoItemReader.setParameterValues(parameterList);
    mongoItemReader.setTargetType(Document.class);
    mongoItemReader.setQuery(new Query().limit(5));
    mongoItemReader.setSort(new HashMap<String, Sort.Direction>() {{
        put("_id", Sort.Direction.DESC);
    }});
    mongoItemReader.setPageSize(5);
    return mongoItemReader;
}

//编写器配置

@Bean
public JdbcBatchItemWriter<WR3DataRecovery> itemWriter() {
    JdbcBatchItemWriter<WR3DataRecovery> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
    jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new 
    BeanPropertyItemSqlParameterSourceProvider<DataRecovery>());
    jdbcBatchItemWriter.setSql("INSERT INTO 
    TABLE(ID,APPID,AP,FY,OBJECT_NAME,OBJECT_SEQ_ID," 
    +"SCOPE_NAME,TEMPLATE_NAME,OBJECT_INFO,ACTIVE,IS_PRIMARY,CREATED_BY,CREATED_DATE,LAST_UPDATED_BY,LAST_UPDATED_DATE)" +
            " VALUES(ECORERPTMD_CORE.hibernate_sequence.nextval,:appId,:aP,:fY,:objectName,:objectSeqId,:scopeName,:templateName," +
            ":objectInfo,:active,:isPrimary,:createdBy,:createdDate,:lastUpdatedBy,:lastUpdatedDate)");
    jdbcBatchItemWriter.setDataSource(dataSource);
    return jdbcBatchItemWriter;
}

// 分区处理程序配置

@Bean
public PartitionHandler importDataForACollectionPartitionHandler() {
    return taskExecutorPartitionHandler(importDataForACollectionWorkerStep);
}


private TaskExecutorPartitionHandler taskExecutorPartitionHandler(Step step){
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(threadPoolTaskExecutor());
    retVal.setStep(step);
    retVal.setGridSize(10);
    return retVal;
}


private ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.setThreadNamePrefix("threadPoolTaskExecutor-");
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

//分区逻辑

Long partitionCount = count/gridSize;
Long gte = 0L;
Long lt = 0L;

for(int j = 0; j < gridSize; j++) {
    if(j == gridSize-1) {
            lt = count;
    }else{
          lt += partitionCount;
    }
    result.put("partition"+j, createExecutionContext("partition"+j, 
    gte.toString(), lt.toString(),getCollectionName()));
        gte+= partitionCount;
}

//作业仓库配置

@Configuration
public class JobMetaDataLocalConfig extends DefaultBatchConfigurer {
   @Override
   @Autowired
   public void setDataSource(DataSource dataSource) {
   }
 }

我在我的本地 Windows 10 机器上运行。以下是我的分区执行统计信息。

步骤:[importDataForACollectionWorkerStep:partition8] 执行时间为 21s965ms

步骤:[importDataForACollectionWorkerStep:partition5] 执行时间为 10s493ms

Step: [importDataForACollectionWorkerStep:partition2] 执行时间为 2m7s236ms

Step: [importDataForACollectionWorkerStep:partition4] 执行时间为 4m0s52ms

步骤:[importDataForACollectionWorkerStep:partition7] 执行时间为 18m36s676ms

步骤:[importDataForACollectionWorkerStep:partition6] 执行时间为 15h48m39s976ms

//分区日志

partition0:{partition=partition0,lt=316,gte=0,collectionName=GenericDataDomain}

partition1:{partition=partition1,lt=632,gte=316,collectionName=GenericDataDomain}

partition2:{partition=partition2,lt=948,gte=632,collectionName=GenericDataDomain}

partition3:{partition=partition3,lt=1264,gte=948,collectionName=GenericDataDomain}

partition4:{partition=partition4,lt=1580,gte=1264,collectionName=GenericDataDomain}

partition5:{partition=partition5,lt=1896,gte=1580,collectionName=GenericDataDomain}

partition6:{partition=partition6,lt=2212,gte=1896,collectionName=GenericDataDomain}

partition7:{partition=partition7,lt=2528,gte=2212,collectionName=GenericDataDomain}

partition8:{partition=partition8,lt=2844,gte=2528,collectionName=GenericDataDomain}

partition9:{partition=partition9,lt=3162,gte=2844,collectionName=GenericDataDomain}

如果你看上面的分区步骤执行,最后一个步骤 partition6 已经花费了 15 小时,剩余的步骤分区仍在运行。为什么 step partition6 花了 15hrs 其他人花了更少的时间?下面的spring批处理本地分区作业配置有什么问题?

感谢您的投入。

标签: spring-bootspring-batch

解决方案


批次因素可能会导致这种延迟,以下几点将有助于识别问题

  1. 操作系统窗口。如果您在 Linux 中尝试,您将看到真正的基准测试。
  2. 尝试使用 Ken 提到的单个分区来缩小延迟。
  3. Windows 和内存的 CPU 配置。
  4. 读取所选分区的任何延迟。
  5. 数据源连接池配置。

如果仍然无法识别 github 中的共享代码,我们可以使用 Mongo 和 Oracle 进行验证。


推荐阅读