java - 如何使弹簧批处理步骤执行与可配置的线程数并行?
问题描述
我有以下 spring-batch 应用程序
SpringBatchApplication.java
package com.spbt.job.sample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}
TraverseJob.java
package com.spbt.job.sample;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class TraverseJob {
@Autowired
protected JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
private String inputFolderPath = "/tmp/inputFolder";
@Bean("TraverseJob")
public Job job() {
return jobBuilderFactory.get("TraverseJob")
.incrementer(new RunIdIncrementer())
.start(traverseStep())
.build();
}
@Bean("TraverseStep")
public Step traverseStep() {
return stepBuilderFactory.get("TraverseStep")
.tasklet(traverseJobTasklet(null))
.build();
}
@Bean("TraverseJobTasklet")
@StepScope
public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
TraverseJobTasklet tasklet = new TraverseJobTasklet();
tasklet.setJobDate(date);
tasklet.setJobDirPath(inputFolderPath);
return tasklet;
}
}
TraverseJobTasklet.java
package com.spbt.job.sample;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
public class TraverseJobTasklet implements Tasklet {
private String jobDirPath;
private String jobDate;
@Autowired
private RemoteFilePush remoteFilePush;
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
try {
traverseDir(new File(jobDirPath));
} catch (Exception ex) {
throw ex;
}
return RepeatStatus.FINISHED;
}
private void traverseDir(File filePath) throws Exception {
try {
File[] files = filePath.listFiles();
if (files != null) {
for (File file : files) {
String name = file.getName();
if (file.isDirectory()) {
if (remoteFilePush.isRemoteDirExist(name)) {
continue;
} else {
remoteFilePush.createRemoteDir(name);
traverseDir(file);
}
} else {
remoteFilePush.pushFile(file.getPath());
}
}
} else {
throw new Exception("empty/null dir -> " + filePath.getName());
}
} catch (Exception ex) {
throw ex;
}
}
public String getJobDirPath() {
return jobDirPath;
}
public void setJobDirPath(String jobDirPath) {
this.jobDirPath = jobDirPath;
}
public String getJobDate() {
return jobDate;
}
public void setJobDate(String jobDate) {
this.jobDate = jobDate;
}
}
RemoteFilePushLogic.java
package com.spbt.job.sample;
import org.springframework.stereotype.Component;
@Component
public class RemoteFilePush {
public boolean isRemoteDirExist(String name) throws InterruptedException {
boolean isRemoteDirExist = false;
// code to check dir on remote server
return isRemoteDirExist;
}
public void createRemoteDir(String name) throws InterruptedException {
// code to create dir on remote server
}
public void pushFile(String path) throws InterruptedException {
// code to push file on remote server
System.out.println("Pushed");
}
}
我想在 TraverseJobTasklet 的 traverseDir 方法中进行并行遍历和执行,通过保持我的 RemoteFilePush 逻辑完整,我的 inputFolderPath 可以包含多个子目录,每个子目录都包含一些文件。
我试图关注我正在使用的 spring-batch 版本的链接,但它基于 xml,我似乎不明白如何从我拥有的单个 traverseStep 中创建多个步骤?
解决方案
每个工作步骤输入一个子文件夹字符串路径是我用弹簧代码撞墙的地方,如果你能指出我一些参考。这会很有帮助,网上的大多数示例都是基于 xml 的。
这是一个带有 Java 配置的快速独立示例:
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class PartitionJobSample {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}
@Bean
public Step managerStep() {
return steps.get("masterStep")
.partitioner(workerStep().getName(), partitioner(null))
.step(workerStep())
.gridSize(4)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production
}
@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobParameters['rootFolder']}") String rootFolder) {
List<String> subFolders = getSubFolders(rootFolder);
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
for (String folder : subFolders) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("filePath", folder);
map.put("partition-for-" + folder, executionContext);
}
return map;
}
};
}
private List<String> getSubFolders(String rootFolder) {
// TODO implement this
return Arrays.asList("/data/folder1", "/data/folder2");
}
@Bean
public Step workerStep() {
return steps.get("workerStep")
.tasklet(getTasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet getTasklet(@Value("#{stepExecutionContext['filePath']}") String filePath) {
return new TraverseJobTasklet(filePath);
}
@Bean
public Job job() {
return jobs.get("job")
.start(managerStep())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("rootFolder", "/data")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
class TraverseJobTasklet implements Tasklet {
private String filePath;
public TraverseJobTasklet(String filePath) {
this.filePath = filePath;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// TODO call traversePath for filePath which is a sub-folder here
System.out.println(Thread.currentThread().getName() + " processing sub-folder " + filePath);
return RepeatStatus.FINISHED;
}
}
}
它将根目录作为作业参数传递,并执行一个分区步骤,每个工作人员处理一个子文件夹(调用你的 tasklet)。
如果你运行它,你应该会看到如下内容:
SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2
我会让你相应地适应你的情况。
推荐阅读
- javascript - 如何为整个站点创建 ajax 函数?
- javascript - 角反应形式 - 复选框 - 使用现有数组初始化 formArray
- javascript - JavaScript 枚举关键字
- flutter - 无论页面如何都显示 Global SnackBar
- highcharts - 我的 highcharts 代码有什么问题?
- c++ - 如何从 C/C++ 程序在 Linux 终端上运行命令
- python - 图像显示在 tkinter 的错误屏幕上
- c++ - 在priority_queue的情况下运算符重载如何工作?
- c++ - Visual Studio - 静态库不会链接
- c# - IISReset 正在覆盖应用程序池状态