java - Spring Batch - 自定义作业 - 动态传递分区文件名
问题描述
我正在尝试构建一个spring批处理应用程序,其中批处理作业是动态构建的(不是spring托管bean)并使用JobLauncher启动。该作业是基于源文件和一些其他信息(例如目标存储等)构建的...基于这些详细信息,我必须使用相应的读取器/写入器构建一个作业。
我能够成功构建和启动同步以及多线程作业。我正在尝试扩展应用程序以使用分区 SPI 处理大文件。但我无法找到将正确分区传递给步骤的方法。
因为在正常应用程序中使用 StepScope 注释,所以 spring 为每个 Step 创建一个单独的阅读器。后期绑定 (@Value) 有助于将 StepExecution (filePath) 信息传递给阅读器。
有什么方法可以在不使用 Step 范围的情况下实现我的用例?
class CustomJobBuilder {
//JobInfo contains table name, source file etc...
Job build(JobInfo jobInfo) throws Exception {
return jobBuilderFactory
.get(jobInfo.getName())
.start(masterStep())
.build();
}
private Step masterStep() throws Exception {
Step importFileStep = importFileStep();
return stepBuilderFactory
.get("masterStep")
.partitioner(importFileStep.getName(), partitioner())
.step(importFileStep)
.gridSize(6)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
private MultiResourcePartitioner partitioner() throws IOException {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName(PARTITION_KEY_NAME);
ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
partitioner.setResources(patternResolver.getResources(jobInfo.getFilePath())); //*.csv
return partitioner;
}
private Step importFileStep() throws Exception {
JdbcBatchItemWriter<Row> successRecordsWriter = dbWriter();
FlatFileItemWriter<Row> failedRecordsWriter = errorWriter();
return stepBuilderFactory
.get("importFile")
.<Row, Row>chunk(CHUNK_SIZE)
.reader(csvReader(null))
.processor(processor())
.writer(writer(successRecordsWriter, failedRecordsWriter))
.stream(failedRecordsWriter)
.build();
}
//Problem here. Passing filePath to CSV Reader dynamically
private ItemReader<Row> csvReader(@Value("#{stepExecutionContext['" + PARTITION_KEY_NAME + "']}") String filePath) {
DefaultLineMapper<Row> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(jobInfo.getColumns());
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new CustomFieldSetMapper(jobInfo.getColumns()));
lineMapper.afterPropertiesSet();
FlatFileItemReader<Row> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new FileSystemResource(filePath));
reader.setLineMapper(lineMapper);
return reader;
}
}
class CustomJobLauncher {
JobParameters jobParameters = new JobParametersBuilder()
.addString("id", UUID.randomUUID().toString())
.toJobParameters();
JobExecution jobExecution;
try {
CustomJobBuilder jobBuilder = new CustomJobBuilder();
jobBuilder.setJobBuilderFactory(jobBuilderFactory);
jobBuilder.setDataSource(getDataSource(objectDto.getDataStore()));
jobBuilder.setStepBuilderFactory(stepBuilderFactory);
jobExecution = jobLauncher.run(jobBuilder.build(jobInfo), jobParameters);
jobExecution.getAllFailureExceptions().forEach(Throwable::printStackTrace);
} catch (Exception e) {
LOGGER.error("Failed", e);
}
}
解决方案
我通过模仿 MessageChannelRemotePartitionHandler 和 StepExecutionRequestHandler 解决了这个问题。
我没有依赖 BeanFactoryStepLocator 从 beanFactory 获取 step,而是在 slave 上重新构建了 step 并执行了它。
您必须谨慎构建新步骤,因为它必须在所有从属设备上完全相同,否则会导致处理/写入不一致。
// PartitionHandler - partition method
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
final StepExecution masterStepExecution) throws Exception {
final Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);
if(CollectionUtils.isEmpty(split)) {
return null;
}
int count = 0;
for (StepExecution stepExecution : split) {
Message<PartitionExecutionRequest> request = createMessage(count++, split.size(),
new PartitionExecutionRequest(stepExecution.getJobExecutionId(), stepExecution.getId(), RequestContextProvider.getRequestInfo(), jobInfo, object),
replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Sending request: " + request);
}
messagingGateway.send(request);
}
if(!pollRepositoryForResults) {
return receiveReplies(replyChannel);
}
else {
return pollReplies(masterStepExecution, split);
}
}
//On the slave
@MessageEndpoint
public class PartitionExecutionRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionExecutionRequestHandler.class);
private BatchBeanProvider batchBeanProvider;
public void setBatchBeanProvider(BatchBeanProvider batchBeanProvider) {
this.batchBeanProvider = batchBeanProvider;
}
@ServiceActivator
public StepExecution handle(PartitionExecutionRequest request) {
StepExecution stepExecution = null;
try {
before(request);
Long jobExecutionId = request.getJobExecutionId();
Long stepExecutionId = request.getStepExecutionId();
stepExecution = batchBeanProvider.getJobExplorer().getStepExecution(jobExecutionId, stepExecutionId);
if (stepExecution == null) {
throw new NoSuchStepException("No StepExecution could be located for this request: " + request);
}
try {
CustomJobCreator jobCreator = new CustomJobCreator(batchBeanProvider, request.getJobInfo(), request.getObject());
jobCreator.afterPropertiesSet();
ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
Resource resource = patternResolver.getResource(stepExecution.getExecutionContext().getString(CustomJobCreator.PARTITION_KEY_NAME));
Step step = jobCreator.partitionStep(resource.getFile().getAbsolutePath());
step.execute(stepExecution);
} catch (JobInterruptedException e) {
stepExecution.setStatus(BatchStatus.STOPPED);
// The receiver should update the stepExecution in repository
} catch (Throwable e) {
stepExecution.addFailureException(e);
stepExecution.setStatus(BatchStatus.FAILED);
// The receiver should update the stepExecution in repository
}
}
return stepExecution;
}
}
推荐阅读
- python - 无法安装 Bert Serving 服务器。“找不到满足要求 bert-serving-server 的版本”
- mysql - MySQL查询以连接来自多个列的数据
- c# - 有没有办法可以使方法防弹,例如,如果使用空参数调用该方法,那么它会给出异常?
- tensorflow - 已安装 Tensorflow,但无法在 Python 上加载
- git - 是否可以在 git 中进行“混合”字差异?
- react-native-android - 无法从 react-native 中的 asyncStorage 获取价值
- azure - 每个订阅的单个消费计划可以创建多少个功能应用/插槽?
- sqlite - PRAGMA incremental_vacuum(1000) 只会删除一页,为什么?
- python - 赛车游戏:根据是否在赛道上改变汽车运动
- typescript - 如何在不导入库的情况下获得 VS Code / typescript 自动完成