首页 > 解决方案 > Spring Batch - 自定义作业 - 动态传递分区文件名

问题描述

我正在尝试构建一个spring批处理应用程序,其中批处理作业是动态构建的(不是spring托管bean)并使用JobLauncher启动。该作业是基于源文件和一些其他信息(例如目标存储等)构建的...基于这些详细信息,我必须使用相应的读取器/写入器构建一个作业。

我能够成功构建和启动同步以及多线程作业。我正在尝试扩展应用程序以使用分区 SPI 处理大文件。但我无法找到将正确分区传递给步骤的方法。

因为在正常应用程序中使用 StepScope 注释,所以 spring 为每个 Step 创建一个单独的阅读器。后期绑定 (@Value) 有助于将 StepExecution (filePath) 信息传递给阅读器。

有什么方法可以在不使用 Step 范围的情况下实现我的用例?

​class CustomJobBuilder {
    ​//JobInfo contains table name, source file etc...

    ​Job build(JobInfo jobInfo) throws Exception {
      return jobBuilderFactory
          .get(jobInfo.getName())
          .start(masterStep())
          .build();
    }


  private Step masterStep() throws Exception {
    Step importFileStep = importFileStep();
    return stepBuilderFactory
        .get("masterStep")
        .partitioner(importFileStep.getName(), partitioner())
        .step(importFileStep)
        .gridSize(6)
        .taskExecutor(new SimpleAsyncTaskExecutor())
        .build();
  }

  private MultiResourcePartitioner partitioner() throws IOException {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setKeyName(PARTITION_KEY_NAME);
    ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
    partitioner.setResources(patternResolver.getResources(jobInfo.getFilePath())); //*.csv
    return partitioner;
  }

  private Step importFileStep() throws Exception {
    JdbcBatchItemWriter<Row> successRecordsWriter = dbWriter();
    FlatFileItemWriter<Row> failedRecordsWriter = errorWriter();
    return stepBuilderFactory
        .get("importFile")
        .<Row, Row>chunk(CHUNK_SIZE)
        .reader(csvReader(null))
        .processor(processor())
        .writer(writer(successRecordsWriter, failedRecordsWriter))
        .stream(failedRecordsWriter)
        .build();
  }

  //Problem here. Passing filePath to CSV Reader dynamically
  private ItemReader<Row> csvReader(@Value("#{stepExecutionContext['" + PARTITION_KEY_NAME + "']}") String filePath) {
    DefaultLineMapper<Row> lineMapper = new DefaultLineMapper<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames(jobInfo.getColumns());
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new CustomFieldSetMapper(jobInfo.getColumns()));
    lineMapper.afterPropertiesSet();

    FlatFileItemReader<Row> reader = new FlatFileItemReader<>();
    reader.setLinesToSkip(1);
    reader.setResource(new FileSystemResource(filePath));
    reader.setLineMapper(lineMapper);
    return reader;
  }
​}

​class CustomJobLauncher {

    JobParameters jobParameters = new JobParametersBuilder()
        .addString("id", UUID.randomUUID().toString())
        .toJobParameters();
    JobExecution jobExecution;
    try {
      CustomJobBuilder jobBuilder = new CustomJobBuilder();
      jobBuilder.setJobBuilderFactory(jobBuilderFactory);
      jobBuilder.setDataSource(getDataSource(objectDto.getDataStore()));
      jobBuilder.setStepBuilderFactory(stepBuilderFactory);

      jobExecution = jobLauncher.run(jobBuilder.build(jobInfo), jobParameters);
      jobExecution.getAllFailureExceptions().forEach(Throwable::printStackTrace);
    } catch (Exception e) {
      LOGGER.error("Failed", e);
    }
}

标签: javaspringspring-batch

解决方案


我通过模仿 MessageChannelRemotePartitionHandler 和 StepExecutionRequestHandler 解决了这个问题。

我没有依赖 BeanFactoryStepLocator 从 beanFactory 获取 step,而是在 slave 上重新构建了 step 并执行了它。

您必须谨慎构建新步骤,因为它必须在所有从属设备上完全相同,否则会导致处理/写入不一致。

// PartitionHandler - partition method
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
                                          final StepExecution masterStepExecution) throws Exception {

    final Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);

    if(CollectionUtils.isEmpty(split)) {
      return null;
    }

    int count = 0;

    for (StepExecution stepExecution : split) {
      Message<PartitionExecutionRequest> request = createMessage(count++, split.size(),
          new PartitionExecutionRequest(stepExecution.getJobExecutionId(), stepExecution.getId(), RequestContextProvider.getRequestInfo(), jobInfo, object),
          replyChannel);
      if (logger.isDebugEnabled()) {
        logger.debug("Sending request: " + request);
      }
      messagingGateway.send(request);
    }

    if(!pollRepositoryForResults) {
      return receiveReplies(replyChannel);
    }
    else {
      return pollReplies(masterStepExecution, split);
    }
  }

//On the slave
@MessageEndpoint
public class PartitionExecutionRequestHandler {

  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionExecutionRequestHandler.class);
  private BatchBeanProvider batchBeanProvider;

  public void setBatchBeanProvider(BatchBeanProvider batchBeanProvider) {
    this.batchBeanProvider = batchBeanProvider;
  }


  @ServiceActivator
  public StepExecution handle(PartitionExecutionRequest request) {
    StepExecution stepExecution = null;
    try {
      before(request);
      Long jobExecutionId = request.getJobExecutionId();
      Long stepExecutionId = request.getStepExecutionId();
      stepExecution = batchBeanProvider.getJobExplorer().getStepExecution(jobExecutionId, stepExecutionId);
      if (stepExecution == null) {
        throw new NoSuchStepException("No StepExecution could be located for this request: " + request);
      }
      try {
        CustomJobCreator jobCreator = new CustomJobCreator(batchBeanProvider, request.getJobInfo(), request.getObject());
        jobCreator.afterPropertiesSet();
        ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
        Resource resource = patternResolver.getResource(stepExecution.getExecutionContext().getString(CustomJobCreator.PARTITION_KEY_NAME));
        Step step = jobCreator.partitionStep(resource.getFile().getAbsolutePath());
        step.execute(stepExecution);
      } catch (JobInterruptedException e) {
        stepExecution.setStatus(BatchStatus.STOPPED);
        // The receiver should update the stepExecution in repository
      } catch (Throwable e) {
        stepExecution.addFailureException(e);
        stepExecution.setStatus(BatchStatus.FAILED);
        // The receiver should update the stepExecution in repository
      }
    }
    return stepExecution;
  }
}

推荐阅读