首页 > 解决方案 > 如何使弹簧批处理步骤执行与可配置的线程数并行?

问题描述

我有以下 spring-batch 应用程序

SpringBatchApplication.java

package com.spbt.job.sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchApplication.class, args);
    }
}

TraverseJob.java

package com.spbt.job.sample;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@EnableBatchProcessing
public class TraverseJob {

    @Autowired
    protected JobBuilderFactory jobBuilderFactory;

    @Autowired
    protected StepBuilderFactory stepBuilderFactory;

    private String inputFolderPath = "/tmp/inputFolder";

    @Bean("TraverseJob")
    public Job job() {
        return jobBuilderFactory.get("TraverseJob")
                .incrementer(new RunIdIncrementer())
                .start(traverseStep())
                .build();
    }


    @Bean("TraverseStep")
    public Step traverseStep() {
        return stepBuilderFactory.get("TraverseStep")
                .tasklet(traverseJobTasklet(null))
                .build();
    }

    @Bean("TraverseJobTasklet")
    @StepScope
    public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
        TraverseJobTasklet tasklet = new TraverseJobTasklet();

        tasklet.setJobDate(date);
        tasklet.setJobDirPath(inputFolderPath);

        return tasklet;
    }
}

TraverseJobTasklet.java

package com.spbt.job.sample;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;

public class TraverseJobTasklet implements Tasklet {

    private String jobDirPath;
    private String jobDate;

    @Autowired
    private RemoteFilePush remoteFilePush;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        try {
            traverseDir(new File(jobDirPath));
        } catch (Exception ex) {
            throw ex;
        }
        return RepeatStatus.FINISHED;
    }

    private void traverseDir(File filePath) throws Exception {
        try {
            File[] files = filePath.listFiles();
            if (files != null) {
                for (File file : files) {
                    String name = file.getName();
                    if (file.isDirectory()) {
                        if (remoteFilePush.isRemoteDirExist(name)) {
                            continue;
                        } else {
                            remoteFilePush.createRemoteDir(name);
                            traverseDir(file);
                        }
                    } else {
                        remoteFilePush.pushFile(file.getPath());
                    }
                }
            } else {
                throw new Exception("empty/null dir -> " + filePath.getName());
            }
        } catch (Exception ex) {
            throw ex;
        }
    }


    public String getJobDirPath() {
        return jobDirPath;
    }

    public void setJobDirPath(String jobDirPath) {
        this.jobDirPath = jobDirPath;
    }

    public String getJobDate() {
        return jobDate;
    }

    public void setJobDate(String jobDate) {
        this.jobDate = jobDate;
    }
}

RemoteFilePushLogic.java

package com.spbt.job.sample;

import org.springframework.stereotype.Component;

@Component
public class RemoteFilePush {

    public boolean isRemoteDirExist(String name) throws InterruptedException {
        boolean isRemoteDirExist = false;
        // code to check dir on remote server
        return isRemoteDirExist;
    }

    public void createRemoteDir(String name) throws InterruptedException {
        // code to create dir on remote server
    }

    public void pushFile(String path) throws InterruptedException {
        // code to push file on remote server
        System.out.println("Pushed");
    }
}

我想在 TraverseJobTasklet 的 traverseDir 方法中进行并行遍历和执行,通过保持我的 RemoteFilePush 逻辑完整,我的 inputFolderPath 可以包含多个子目录,每个子目录都包含一些文件。

我试图关注我正在使用的 spring-batch 版本的链接,但它基于 xml,我似乎不明白如何从我拥有的单个 traverseStep 中创建多个步骤?

标签: javaspringspring-bootspring-batch

解决方案


每个工作步骤输入一个子文件夹字符串路径是我用弹簧代码撞墙的地方,如果你能指出我一些参考。这会很有帮助,网上的大多数示例都是基于 xml 的。

这是一个带有 Java 配置的快速独立示例:

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class PartitionJobSample {

    private final JobBuilderFactory jobs;
    private final StepBuilderFactory steps;

    public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
        this.jobs = jobs;
        this.steps = steps;
    }

    @Bean
    public Step managerStep() {
        return steps.get("masterStep")
                .partitioner(workerStep().getName(), partitioner(null))
                .step(workerStep())
                .gridSize(4)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public SimpleAsyncTaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production
    }

    @Bean
    @StepScope
    public Partitioner partitioner(@Value("#{jobParameters['rootFolder']}") String rootFolder) {
        List<String> subFolders = getSubFolders(rootFolder);
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {
                Map<String, ExecutionContext> map = new HashMap<>(gridSize);
                for (String folder : subFolders) {
                    ExecutionContext executionContext = new ExecutionContext();
                    executionContext.put("filePath", folder);
                    map.put("partition-for-" + folder, executionContext);
                }
                return map;
            }
        };
    }

    private List<String> getSubFolders(String rootFolder) {
        // TODO implement this
        return Arrays.asList("/data/folder1", "/data/folder2");
    }

    @Bean
    public Step workerStep() {
        return steps.get("workerStep")
                .tasklet(getTasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public Tasklet getTasklet(@Value("#{stepExecutionContext['filePath']}") String filePath) {
        return new TraverseJobTasklet(filePath);
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(managerStep())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("rootFolder", "/data")
                .toJobParameters();
        jobLauncher.run(job, jobParameters);
    }

    class TraverseJobTasklet implements Tasklet {

        private String filePath;

        public TraverseJobTasklet(String filePath) {
            this.filePath = filePath;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            // TODO call traversePath for filePath which is a sub-folder here
            System.out.println(Thread.currentThread().getName() + " processing sub-folder " + filePath);
            return RepeatStatus.FINISHED;
        }
    }

}

它将根目录作为作业参数传递,并执行一个分区步骤,每个工作人员处理一个子文件夹(调用你的 tasklet)。

如果你运行它,你应该会看到如下内容:

SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2

我会让你相应地适应你的情况。


推荐阅读