首页 > 解决方案 > 是否可以在春季批处理中跨单个文件进行分区?

问题描述

我已经阅读了有关 spring-batch 中的分区的信息,我找到了一个演示分区的示例。该示例从 CSV 文件中读取人员,进行一些处理并将数据插入数据库。所以在这个例子中 1 个分区 = 1 个文件,所以分区器实现看起来像这样:

public class MultiResourcePartitioner implements Partitioner {

    private final Logger logger = LoggerFactory.getLogger(MultiResourcePartitioner.class);
    public static final String FILE_PATH = "filePath";

    private static final String PARTITION_KEY = "partition";

    private final Collection<Resource> resources;


    public MultiResourcePartitioner(Collection<Resource> resources) {
        this.resources = resources;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        int i = 0;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            context.putString(FILE_PATH, getPath(resource)); //Depends on what logic you want to use to split
            map.put(PARTITION_KEY + i++, context);
        }
        return map;
    }

    private String getPath(Resource resource) {
        try {
            return resource.getFile().getPath();
        } catch (IOException e) {
            logger.warn("Can't get file from from resource {}", resource);
            throw new RuntimeException(e);
        }
    }
}

但是如果我有一个 10TB 的文件呢?春季批次是否允许以某种方式对其进行分区?

更新:

我尝试了以下方法来实现我想要的:

做 2 个步骤 - 第一步将文件分成几部分,第二步处理我们在第一步之后得到的部分:

@Configuration
public class SingleFilePartitionedJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ToLowerCasePersonProcessor toLowerCasePersonProcessor;

    @Autowired
    private DbPersonWriter dbPersonWriter;

    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Value("${app.file-to-split}")
    private Resource resource;


    @Bean
    public Job splitFileProcessingJob() throws IOException {
        return jobBuilderFactory.get("splitFileProcessingJob")
                .incrementer(new RunIdIncrementer())
                .flow(splitFileIntoPiecesStep())
                .next(csvToDbLowercaseMasterStep())
                .end()
                .build();
    }

    private Step splitFileIntoPiecesStep() throws IOException {
        return stepBuilderFactory.get("splitFile")
                .tasklet(new FileSplitterTasklet(resource.getFile()))
                .build();
    }

    @Bean
    public Step csvToDbLowercaseMasterStep() throws IOException {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        partitioner.setResources(resourcePatternResolver.getResources("split/*.csv"));
        return stepBuilderFactory.get("csvReaderMasterStep")
                .partitioner("csvReaderMasterStep", partitioner)
                .gridSize(10)
                .step(csvToDataBaseSlaveStep())
                .taskExecutor(jobTaskExecutorSplitted())
                .build();
    }

    @Bean
    public Step csvToDataBaseSlaveStep() throws MalformedURLException {
        return stepBuilderFactory.get("csvToDatabaseStep")
                .<Person, Person>chunk(50)
                .reader(csvPersonReaderSplitted(null))
                .processor(toLowerCasePersonProcessor)
                .writer(dbPersonWriter)
                .build();

    }

    @Bean
    @StepScope
    public FlatFileItemReader csvPersonReaderSplitted(@Value("#{stepExecutionContext[fileName]}") String fileName) throws MalformedURLException {
        return new FlatFileItemReaderBuilder()
                .name("csvPersonReaderSplitted")
                .resource(new UrlResource(fileName))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();

    }

    @Bean
    public TaskExecutor jobTaskExecutorSplitted() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setCorePoolSize(25);
        taskExecutor.setThreadNamePrefix("cust-job-exec2-");
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

}

小任务:

public class FileSplitterTasklet implements Tasklet {
    private final Logger logger = LoggerFactory.getLogger(FileSplitterTasklet.class);
    private File file;

    public FileSplitterTasklet(File file) {
        this.file = file;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        int count = FileSplitter.splitTextFiles(file, 100);
        logger.info("File was split on {} files", count);
        return RepeatStatus.FINISHED;

    }
}

分割文件的逻辑:

  public static int splitTextFiles(File bigFile, int maxRows) throws IOException {    
        int fileCount = 1;
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(bigFile.getPath()))) {
            String line = null;
            int lineNum = 1;
            Path splitFile = Paths.get(bigFile.getParent() + "/" + fileCount + "split.txt");
            BufferedWriter writer = Files.newBufferedWriter(splitFile, StandardOpenOption.CREATE);

            while ((line = reader.readLine()) != null) {

                if (lineNum > maxRows) {
                    writer.close();
                    lineNum = 1;
                    fileCount++;
                    splitFile = Paths.get("split/" + fileCount + "split.txt");
                    writer = Files.newBufferedWriter(splitFile, StandardOpenOption.CREATE);
                }

                writer.append(line);
                writer.newLine();
                lineNum++;
            }
            writer.close();
        }

        return fileCount;
    }

所以我把所有文件都放到了特殊目录下。

但这不起作用,因为在上下文初始化时文件夹/split还不存在。

更新

我已经生成了可行的解决方法:

public class MultiResourcePartitionerWrapper implements Partitioner {
    private final MultiResourcePartitioner multiResourcePartitioner = new MultiResourcePartitioner();
    private final ResourcePatternResolver resourcePatternResolver;
    private final String pathPattern;

    public MultiResourcePartitionerWrapper(ResourcePatternResolver resourcePatternResolver, String pathPattern) {
        this.resourcePatternResolver = resourcePatternResolver;
        this.pathPattern = pathPattern;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        try {
            Resource[] resources = resourcePatternResolver.getResources(pathPattern);
            multiResourcePartitioner.setResources(resources);
            return multiResourcePartitioner.partition(gridSize);

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

但它看起来很丑。这是一个正确的解决方案吗?

标签: javaspringmultithreadingspring-batchpartitioning

解决方案


Spring Batch 允许你进行分区,但如何做取决于你。

您可以简单地在分区程序类中拆分您的 10TB 文件(按数量或按最大行数),每个分区读取一个拆分后的文件。您可以找到很多关于如何在 java 中拆分大文件的示例。 按最大行拆分非常大的文本文件


推荐阅读