首页 > 解决方案 > Spring Boot 批处理框架

问题描述

要求:

我已经为一些表创建了应用程序。使用 JDBCCursorItemReader 和 MongoItemWriter 来配置作业。配置本地分区以提高作业性能。虽然一次运行一项作业的性能很好,但以并行方式运行多个作业时,单个作业的执行时间会变长。

单个作业运行性能:Job1 => 500k 记录 => 2m 5s 418ms

运行多个作业时:Job1 => 500k 记录 => 3m 15s 539ms

我在每个作业中都使用了 threadpooltaskexecutor。

我的工作配置:

package com.sample.oracletomongo.batchconfig;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.data.MongoItemWriter;
import org.springframework.batch.item.data.builder.MongoItemWriterBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.sample.oracletomongo.AdditionalBatchConfiguration;
import com.sample.oracletomongo.ColumnPartitioner;
import com.sample.oracletomongo.CustomParametersIncrementerImpl;
import com.sample.oracletomongo.RowMapper.Sample_RowMapper;
import com.sample.oracletomongo.bean.Sample;

@Configuration
@EnableBatchProcessing
@Import(AdditionalBatchConfiguration.class)
public class Sample_BatchConfig {
    @Autowired
    public DataSource datasource;
    
    @Autowired
    public MongoTemplate mongotemplate;     
    
    @Autowired
    public JdbcTemplate  jdbctemplate;  
    
     @Autowired
        public JobBuilderFactory jobBuilderFactory;
     
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
        
       public List<String> keycolumns(){
           
           List<String> keycolumn=new ArrayList<String>();
           keycolumn.add("NO");
           keycolumn.add("TYPE");
          

    
           return keycolumn;
       }
        
 public List<String> nonkeycolumns(){
           
           List<String> nonkeycolumns=new ArrayList<String>();

        
           nonkeycolumns.add("DATE");
         
           return nonkeycolumns;
       }
        
     
     @Bean
        @StepScope
        public ColumnPartitioner itemmeasurelocalpartitioner(@Value("#{jobParameters['deltadate']}") String deltadate,@Value("#{jobParameters['rundate']}") String rundate) 
        {
            ColumnPartitioner columnRangePartitioner = new ColumnPartitioner();
            columnRangePartitioner.setKeycolumn(keycolumns());
            columnRangePartitioner.setNonkeycolumn(nonkeycolumns());
            columnRangePartitioner.setJdbctemplate(jdbctemplate);
            columnRangePartitioner.setTable("Sample");
            columnRangePartitioner.setDeltadate(deltadate);
            columnRangePartitioner.setRundate(rundate);
            return columnRangePartitioner;
        }
     
        @Bean
        @StepScope
        public JdbcCursorItemReader<Sample> itemReader(@Value("#{stepExecutionContext[rangeSql]}") String rangesql) {
            System.out.println("RangeSQL" + rangesql);
            
            return new JdbcCursorItemReaderBuilder<Sample>()
                .dataSource(datasource) // change accordingly if you use another data source
                .name("Reader")
                .sql(rangesql)
                .rowMapper(new Sample_RowMapper())
                .build();
        }               
        @Bean
        public MongoItemWriter<Sample> writerlocal(MongoTemplate mongoTemplate) {
            
            return new MongoItemWriterBuilder<Sample>().template(mongoTemplate).collection("Sample")
                    .build();
        }

    /*
     * @Bean public TaskExecutor taskExecutor() { return new
     * SimpleAsyncTaskExecutor("spring_batch"); }
     */
        @Bean
        public Job sampleJob() {
            System.out.println("DS : "+datasource);
            return jobBuilderFactory.get("sample")
                    .incrementer(new CustomParametersIncrementerImpl(datasource))
                    .start(masterstep())                    
                    .build();
        }
        @Bean
        public TaskExecutor taskExecutor1() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(64);
            executor.setMaxPoolSize(64);
            executor.setQueueCapacity(64);
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.setThreadNamePrefix("MultiThreaded1-");
            return executor;
        }
        
        //Master
        @Bean
        public Step masterstep() 
        {
    
        
            
            return stepBuilderFactory.get("masterstep")
                    
                    .partitioner("ItemAttributeslavestepSlave", itemmeasurelocalpartitioner(null,null))
                    .step(slavestep())
                    .gridSize(4)
                    .taskExecutor(taskExecutor1())
                    .build();
        }
        
        //SlaveStep
        @Bean

        public Step slavestep() {
       
            return stepBuilderFactory.get("slavestep")
                    .<Sample, Sample> chunk(90)
                    .reader(itemReader(null))                   
                    .writer(writerlocal(mongotemplate))
                    .build();
        }



    

}


package com.sample.oracletomongo;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;

public class ColumnPartitioner implements Partitioner  {
    
    public JdbcTemplate  jdbctemplate;
    
    private String table;
    private List<String> keycolumn;
    private List<String> nonkeycolumn;
    private String deltadate;
    
    private String rundate;
    
    
    
    
    public List<String> getNonkeycolumn() {
        return nonkeycolumn;
    }
    public void setNonkeycolumn(List<String> nonkeycolumn) {
        this.nonkeycolumn = nonkeycolumn;
    }
    public String getDeltadate() {
        return deltadate;
    }
    public void setDeltadate(String deltadate) {
        this.deltadate = deltadate;
    }
    public String getRundate() {
        return rundate;
    }
    public void setRundate(String rundate) {
        this.rundate = rundate;
    }
    public JdbcTemplate getJdbctemplate() {
        return jdbctemplate;
    }
    public void setJdbctemplate(JdbcTemplate jdbctemplate) {
        this.jdbctemplate = jdbctemplate;
    }



    public String getTable() {
        return table;
    }


    public void setTable(String table) {
        this.table = table;
    }



    

    public List<String> getKeycolumn() {
        return keycolumn;
    }



    public void setKeycolumn(List<String> keycolumn) {
        this.keycolumn = keycolumn;
    }



    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // TODO Auto-generated method stub              
        
        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
        
            int totalRecords=jdbctemplate.queryForObject("select count(1)  from "+ table +" where  DATE between TO_DATE( '"+ getDeltadate()+"','YYYY-MM-DD HH24:MI:SS') and TO_DATE('"+getRundate()+"', 'YYYY-MM-DD HH24:MI:SS')", Integer.class);
        

        int noOfRecordsPerRange = totalRecords / gridSize;
        
        if (totalRecords >= gridSize) {
        
            for (int gridNo = 1; gridNo <= gridSize; gridNo++) {
                ExecutionContext value = new ExecutionContext();

                String recordRange = getRecordOffset(gridSize, gridNo,
                        noOfRecordsPerRange, totalRecords);
                value.put("rowCountRange", recordRange);

                System.out.println("Processing from : " + recordRange.split("#")[0]
                        + " to " + recordRange.split("#")[1]);

                // give each thread a name, thread 1,2,3
                value.putString("name", "Thread" + gridNo);
                String sql = frameRangeQuery(recordRange, table);
                value.putString("rangeSql", sql);
                result.put("partition" + gridNo, value);

            }
        }
        else {
            ExecutionContext value = new ExecutionContext();

            // if total records are less than number of grids i.e. total records
            // = 1, gridSize = 5
            gridSize = 1;
            int gridNo = 1;
            String recordRange = "0#" + totalRecords;
            value.put("rowCountRange", recordRange);

            System.out.println("Processing from : " + recordRange.split("#")[0] + " to "
                    + recordRange.split("#")[1]);

            // give each thread a name, thread 1,2,3
            value.putString("name", "Thread" + gridNo);
            String sql = frameRangeQuery(recordRange, table);
            value.putString("rangeSql", sql);
            result.put("partition" + gridNo, value);

        }
        
        return result;
    }
    
    /*
     * THe range returned can be used in queries while OFFSET and ROWS FETCH
     * NEXT clause For Ex: The valid values are OFFSET 0 ROWS FETCH NEXT 10 ROWS
     * ONLY , OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY... etc...
     */
    public static String getRecordOffset(int gridSize, int gridNo,
            int noOfRecordsPerRange, int totalRecords) {
        String recordRange;
        int offset;

        if (gridNo == 1)
            offset = 0;
        else
            offset = noOfRecordsPerRange * (gridNo - 1);

        if (gridNo == gridSize) // Send all the remaining records to the last
                                // batch
            recordRange = offset + "#" + (totalRecords - offset);
        else
            recordRange = offset + "#" + noOfRecordsPerRange;

        return recordRange;

    }
    
    public  String frameRangeQuery(String rowCountRange, String tableName) {
        Assert.notNull(tableName);
        Assert.notNull(rowCountRange);
        
        StringBuffer rangeQuery = new StringBuffer("SELECT ");
        StringBuffer groupByClause = new StringBuffer(" group by ");
        StringBuffer orderByClause = new StringBuffer(" order by ");
        StringBuffer outerQuery = new StringBuffer(" SELECT * FROM  ( " );
        StringBuffer whereclause = new StringBuffer();
        if(!rowCountRange.isEmpty()) {
            String[] range = rowCountRange.split("#");
            Integer numOfRows = Integer.valueOf(range[1]);
            Integer offset = Integer.valueOf(range[0]);
            StringBuffer keyColumns = new StringBuffer();
            StringBuffer nonkeycolumns=new StringBuffer();
            for (Iterator<String> iter = getKeycolumn().iterator(); iter.hasNext();) {
                keyColumns.append(iter.next());

                if (iter.hasNext()) {
                    keyColumns.append(", ");
                } else {
                    keyColumns.append(" ");
                }
            }
            for (Iterator<String> iter = getNonkeycolumn().iterator(); iter.hasNext();) {
                nonkeycolumns.append(iter.next());

                if (iter.hasNext()) {
                    nonkeycolumns.append(", ");
                } else {
                    nonkeycolumns.append(" ");
                }
            }
            rangeQuery.append(keyColumns+", "+nonkeycolumns);
            groupByClause.append(keyColumns);
            orderByClause.append(keyColumns);
            whereclause.append("WHERE UPD_DATE between ");
            whereclause.append("TO_DATE('" + deltadate
                    + "','YYYY-MM-DD HH24:MI:SS') AND ");
            whereclause.append("TO_DATE('" + rundate
                    + "','YYYY-MM-DD HH24:MI:SS') ");
            rangeQuery.append(" FROM "+tableName +" "+whereclause + orderByClause );
            
            
            outerQuery.append( rangeQuery + ") e OFFSET " +offset+" ROWS FETCH NEXT "+ numOfRows +" ROWS ONLY");
 
        
            
            
        }
        System.out.println("Range Query:"+ outerQuery.toString());
        return outerQuery.toString();
    }
}

附加批次配置:

package com.ikea.oracletomongo;

import org.springframework.batch.core.configuration.JobRegistry;

import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

    public class AdditionalBatchConfiguration {
        @Autowired
        private JobLauncher jobLauncher;
        @Autowired
        private JobRepository jobRepository;
        @Autowired
        private JobRegistry jobRegistry;
        
        /*
         * @Autowired private Job importUserJob;
         */
        @Autowired
        private JobExplorer jobExplorer;
        
        /*
         * public SimpleAsyncTaskExecutor createSimpleAsyncTaskExecutor() {
         * SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new
         * SimpleAsyncTaskExecutor(); return simpleAsyncTaskExecutor; }
         */
        
        
        public ThreadPoolTaskExecutor createThreadPoolExecutor() {
            ThreadPoolTaskExecutor t1=new ThreadPoolTaskExecutor();
            t1.setMaxPoolSize(30);
            t1.setQueueCapacity(30);
            t1.setCorePoolSize(20);
            t1.initialize();
            return t1;
            
        }
        
        
        public JobLauncher createJobLauncher() throws Exception {
                SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
                jobLauncher.setJobRepository(jobRepository);
                jobLauncher.setTaskExecutor(createThreadPoolExecutor());
                jobLauncher.afterPropertiesSet();
                return jobLauncher;
        }
        
        @Bean
        public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
            JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
            jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
            return jobRegistryBeanPostProcessor;
        }
        
        @Bean
       public JobOperator jobOperator() throws Exception {
           SimpleJobOperator jobOperator = new SimpleJobOperator();
           jobOperator.setJobExplorer(jobExplorer);
           jobOperator.setJobLauncher(createJobLauncher());
           jobOperator.setJobRegistry(jobRegistry);
           jobOperator.setJobRepository(jobRepository);
           return jobOperator;
       }
         
    }

接口:

public class controller{
   @Autowired
    private JobOperator jobOperator;     
    @Autowired
    private JobExplorer jobExplorer;
    @Autowired
    private JobRegistry jobregistry;
    @Autowired
    public DataSource datasource;

     @GetMapping("/Run/AllJobs")      
        public Map<String,Long> triggerAllJobs() throws NoSuchJobException {
                        
                Long executionId=null;
                BatchStatus status = null;
                /* Trigger all the job at once*/        
            
                List<String> jobnames=new ArrayList<String>();
                for(String names:jobregistry.getJobNames()) {
                    if (names.startsWith("Mongo")) {
                    jobnames.add(names);
                    Job job = jobregistry.getJob(names);
                    
                    JobParameters parameters = new JobParametersBuilder(jobExplorer)
                            .getNextJobParameters(job)
                            .toJobParameters();
                    System.out.println("Jobnames :"+names+" parameters :"+parameters.toString());
                    }
                }
                System.out.println("After JobNAmes:"+Collections.singletonList(jobnames));
                
                Map<String,Long> result=new HashMap<String,Long>();
                
                for(String jobs:jobnames) {
                    
                try {          
                    executionId= jobOperator.startNextInstance(jobs);
                    result.put(jobs, executionId);
                    status = jobExplorer.getJobExecution(executionId).getStatus();
                    System.out.println("status"+status);
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                }
                }
                System.out.println(Collections.singletonList(result));
                return result;
            }
        

}

标签: springmongodbspring-bootspring-batch

解决方案


推荐阅读