spring - Spring Boot 批处理框架
问题描述
要求:
- 从 oracle 读取数据并将其写入 mongo DB。作业应该每天运行。
- 200 多张桌子,所以必须创造 200 个工作岗位。
- 从单个 API 调用中分离作业并以并行方式运行至少 20 个作业。
- 许多表包含超过 10 亿条记录。
- 首次加载时,作业应将整个数据从 oracle 加载到 mongo,然后根据每日运行情况加载增量更新。
我已经为一些表创建了应用程序。使用 JDBCCursorItemReader 和 MongoItemWriter 来配置作业。配置本地分区以提高作业性能。虽然一次运行一项作业的性能很好,但以并行方式运行多个作业时,单个作业的执行时间会变长。
单个作业运行性能:Job1 => 500k 记录 => 2m 5s 418ms
运行多个作业时:Job1 => 500k 记录 => 3m 15s 539ms
我在每个作业中都使用了 threadpooltaskexecutor。
我的工作配置:
package com.sample.oracletomongo.batchconfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.data.MongoItemWriter;
import org.springframework.batch.item.data.builder.MongoItemWriterBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.sample.oracletomongo.AdditionalBatchConfiguration;
import com.sample.oracletomongo.ColumnPartitioner;
import com.sample.oracletomongo.CustomParametersIncrementerImpl;
import com.sample.oracletomongo.RowMapper.Sample_RowMapper;
import com.sample.oracletomongo.bean.Sample;
@Configuration
@EnableBatchProcessing
@Import(AdditionalBatchConfiguration.class)
public class Sample_BatchConfig {
@Autowired
public DataSource datasource;
@Autowired
public MongoTemplate mongotemplate;
@Autowired
public JdbcTemplate jdbctemplate;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
public List<String> keycolumns(){
List<String> keycolumn=new ArrayList<String>();
keycolumn.add("NO");
keycolumn.add("TYPE");
return keycolumn;
}
public List<String> nonkeycolumns(){
List<String> nonkeycolumns=new ArrayList<String>();
nonkeycolumns.add("DATE");
return nonkeycolumns;
}
@Bean
@StepScope
public ColumnPartitioner itemmeasurelocalpartitioner(@Value("#{jobParameters['deltadate']}") String deltadate,@Value("#{jobParameters['rundate']}") String rundate)
{
ColumnPartitioner columnRangePartitioner = new ColumnPartitioner();
columnRangePartitioner.setKeycolumn(keycolumns());
columnRangePartitioner.setNonkeycolumn(nonkeycolumns());
columnRangePartitioner.setJdbctemplate(jdbctemplate);
columnRangePartitioner.setTable("Sample");
columnRangePartitioner.setDeltadate(deltadate);
columnRangePartitioner.setRundate(rundate);
return columnRangePartitioner;
}
@Bean
@StepScope
public JdbcCursorItemReader<Sample> itemReader(@Value("#{stepExecutionContext[rangeSql]}") String rangesql) {
System.out.println("RangeSQL" + rangesql);
return new JdbcCursorItemReaderBuilder<Sample>()
.dataSource(datasource) // change accordingly if you use another data source
.name("Reader")
.sql(rangesql)
.rowMapper(new Sample_RowMapper())
.build();
}
@Bean
public MongoItemWriter<Sample> writerlocal(MongoTemplate mongoTemplate) {
return new MongoItemWriterBuilder<Sample>().template(mongoTemplate).collection("Sample")
.build();
}
/*
* @Bean public TaskExecutor taskExecutor() { return new
* SimpleAsyncTaskExecutor("spring_batch"); }
*/
@Bean
public Job sampleJob() {
System.out.println("DS : "+datasource);
return jobBuilderFactory.get("sample")
.incrementer(new CustomParametersIncrementerImpl(datasource))
.start(masterstep())
.build();
}
@Bean
public TaskExecutor taskExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(64);
executor.setMaxPoolSize(64);
executor.setQueueCapacity(64);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded1-");
return executor;
}
//Master
@Bean
public Step masterstep()
{
return stepBuilderFactory.get("masterstep")
.partitioner("ItemAttributeslavestepSlave", itemmeasurelocalpartitioner(null,null))
.step(slavestep())
.gridSize(4)
.taskExecutor(taskExecutor1())
.build();
}
//SlaveStep
@Bean
public Step slavestep() {
return stepBuilderFactory.get("slavestep")
.<Sample, Sample> chunk(90)
.reader(itemReader(null))
.writer(writerlocal(mongotemplate))
.build();
}
}
package com.sample.oracletomongo;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;
public class ColumnPartitioner implements Partitioner {
public JdbcTemplate jdbctemplate;
private String table;
private List<String> keycolumn;
private List<String> nonkeycolumn;
private String deltadate;
private String rundate;
public List<String> getNonkeycolumn() {
return nonkeycolumn;
}
public void setNonkeycolumn(List<String> nonkeycolumn) {
this.nonkeycolumn = nonkeycolumn;
}
public String getDeltadate() {
return deltadate;
}
public void setDeltadate(String deltadate) {
this.deltadate = deltadate;
}
public String getRundate() {
return rundate;
}
public void setRundate(String rundate) {
this.rundate = rundate;
}
public JdbcTemplate getJdbctemplate() {
return jdbctemplate;
}
public void setJdbctemplate(JdbcTemplate jdbctemplate) {
this.jdbctemplate = jdbctemplate;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public List<String> getKeycolumn() {
return keycolumn;
}
public void setKeycolumn(List<String> keycolumn) {
this.keycolumn = keycolumn;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// TODO Auto-generated method stub
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int totalRecords=jdbctemplate.queryForObject("select count(1) from "+ table +" where DATE between TO_DATE( '"+ getDeltadate()+"','YYYY-MM-DD HH24:MI:SS') and TO_DATE('"+getRundate()+"', 'YYYY-MM-DD HH24:MI:SS')", Integer.class);
int noOfRecordsPerRange = totalRecords / gridSize;
if (totalRecords >= gridSize) {
for (int gridNo = 1; gridNo <= gridSize; gridNo++) {
ExecutionContext value = new ExecutionContext();
String recordRange = getRecordOffset(gridSize, gridNo,
noOfRecordsPerRange, totalRecords);
value.put("rowCountRange", recordRange);
System.out.println("Processing from : " + recordRange.split("#")[0]
+ " to " + recordRange.split("#")[1]);
// give each thread a name, thread 1,2,3
value.putString("name", "Thread" + gridNo);
String sql = frameRangeQuery(recordRange, table);
value.putString("rangeSql", sql);
result.put("partition" + gridNo, value);
}
}
else {
ExecutionContext value = new ExecutionContext();
// if total records are less than number of grids i.e. total records
// = 1, gridSize = 5
gridSize = 1;
int gridNo = 1;
String recordRange = "0#" + totalRecords;
value.put("rowCountRange", recordRange);
System.out.println("Processing from : " + recordRange.split("#")[0] + " to "
+ recordRange.split("#")[1]);
// give each thread a name, thread 1,2,3
value.putString("name", "Thread" + gridNo);
String sql = frameRangeQuery(recordRange, table);
value.putString("rangeSql", sql);
result.put("partition" + gridNo, value);
}
return result;
}
/*
* THe range returned can be used in queries while OFFSET and ROWS FETCH
* NEXT clause For Ex: The valid values are OFFSET 0 ROWS FETCH NEXT 10 ROWS
* ONLY , OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY... etc...
*/
public static String getRecordOffset(int gridSize, int gridNo,
int noOfRecordsPerRange, int totalRecords) {
String recordRange;
int offset;
if (gridNo == 1)
offset = 0;
else
offset = noOfRecordsPerRange * (gridNo - 1);
if (gridNo == gridSize) // Send all the remaining records to the last
// batch
recordRange = offset + "#" + (totalRecords - offset);
else
recordRange = offset + "#" + noOfRecordsPerRange;
return recordRange;
}
public String frameRangeQuery(String rowCountRange, String tableName) {
Assert.notNull(tableName);
Assert.notNull(rowCountRange);
StringBuffer rangeQuery = new StringBuffer("SELECT ");
StringBuffer groupByClause = new StringBuffer(" group by ");
StringBuffer orderByClause = new StringBuffer(" order by ");
StringBuffer outerQuery = new StringBuffer(" SELECT * FROM ( " );
StringBuffer whereclause = new StringBuffer();
if(!rowCountRange.isEmpty()) {
String[] range = rowCountRange.split("#");
Integer numOfRows = Integer.valueOf(range[1]);
Integer offset = Integer.valueOf(range[0]);
StringBuffer keyColumns = new StringBuffer();
StringBuffer nonkeycolumns=new StringBuffer();
for (Iterator<String> iter = getKeycolumn().iterator(); iter.hasNext();) {
keyColumns.append(iter.next());
if (iter.hasNext()) {
keyColumns.append(", ");
} else {
keyColumns.append(" ");
}
}
for (Iterator<String> iter = getNonkeycolumn().iterator(); iter.hasNext();) {
nonkeycolumns.append(iter.next());
if (iter.hasNext()) {
nonkeycolumns.append(", ");
} else {
nonkeycolumns.append(" ");
}
}
rangeQuery.append(keyColumns+", "+nonkeycolumns);
groupByClause.append(keyColumns);
orderByClause.append(keyColumns);
whereclause.append("WHERE UPD_DATE between ");
whereclause.append("TO_DATE('" + deltadate
+ "','YYYY-MM-DD HH24:MI:SS') AND ");
whereclause.append("TO_DATE('" + rundate
+ "','YYYY-MM-DD HH24:MI:SS') ");
rangeQuery.append(" FROM "+tableName +" "+whereclause + orderByClause );
outerQuery.append( rangeQuery + ") e OFFSET " +offset+" ROWS FETCH NEXT "+ numOfRows +" ROWS ONLY");
}
System.out.println("Range Query:"+ outerQuery.toString());
return outerQuery.toString();
}
}
附加批次配置:
package com.ikea.oracletomongo;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class AdditionalBatchConfiguration {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobRegistry jobRegistry;
/*
* @Autowired private Job importUserJob;
*/
@Autowired
private JobExplorer jobExplorer;
/*
* public SimpleAsyncTaskExecutor createSimpleAsyncTaskExecutor() {
* SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new
* SimpleAsyncTaskExecutor(); return simpleAsyncTaskExecutor; }
*/
public ThreadPoolTaskExecutor createThreadPoolExecutor() {
ThreadPoolTaskExecutor t1=new ThreadPoolTaskExecutor();
t1.setMaxPoolSize(30);
t1.setQueueCapacity(30);
t1.setCorePoolSize(20);
t1.initialize();
return t1;
}
public JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(createThreadPoolExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
return jobRegistryBeanPostProcessor;
}
@Bean
public JobOperator jobOperator() throws Exception {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobLauncher(createJobLauncher());
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobRepository(jobRepository);
return jobOperator;
}
}
接口:
public class controller{
@Autowired
private JobOperator jobOperator;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRegistry jobregistry;
@Autowired
public DataSource datasource;
@GetMapping("/Run/AllJobs")
public Map<String,Long> triggerAllJobs() throws NoSuchJobException {
Long executionId=null;
BatchStatus status = null;
/* Trigger all the job at once*/
List<String> jobnames=new ArrayList<String>();
for(String names:jobregistry.getJobNames()) {
if (names.startsWith("Mongo")) {
jobnames.add(names);
Job job = jobregistry.getJob(names);
JobParameters parameters = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(job)
.toJobParameters();
System.out.println("Jobnames :"+names+" parameters :"+parameters.toString());
}
}
System.out.println("After JobNAmes:"+Collections.singletonList(jobnames));
Map<String,Long> result=new HashMap<String,Long>();
for(String jobs:jobnames) {
try {
executionId= jobOperator.startNextInstance(jobs);
result.put(jobs, executionId);
status = jobExplorer.getJobExecution(executionId).getStatus();
System.out.println("status"+status);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
System.out.println(Collections.singletonList(result));
return result;
}
}