java - 春季批量重启作业从初始阶段开始,而不是从中断的地方开始?
问题描述
我想实现重新启动作业的功能以从初始圣人开始它。我面临两个问题。
第一个问题:当我第一次重新启动作业时,它会创建一个新的作业实例 ID,并且表现得像一个新作业。在第二次,它将重新启动并使用相同的作业实例 ID 运行。(我从休息控制器发送了执行 ID)
第二个问题:当我重新启动它时,它将从初始阶段开始。
自定义阅读器:
package com.orange.alc.dabekdataload.reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.core.io.FileSystemResource;
import org.springframework.stereotype.Component;
import com.orange.alc.dabekdataload.constants.PostalHeader;
import com.orange.alc.dabekdataload.dto.PostalDto;
@Component("itemReader")
@Scope(value = "step", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class PostalReader implements ItemReader<PostalDto>, ItemStream{
private static final Logger LOGGER = LoggerFactory.getLogger(PostalReader.class);
@Value("#{jobParameters[fullPathFileName]}")
public String fileName;
private int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
private FlatFileItemReader<PostalDto> reader;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
LOGGER.info("Executing batch reader...");
reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(fileName));
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper<PostalDto>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(PostalHeader.getPostalColumnNames());
}});
setFieldSetMapper(new PostalFieldSetMapper());
}});
reader.setSaveState(true);
reader.open(stepExecution.getExecutionContext());
}
@Override
public PostalDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
reader.setCurrentItemCount(currentIndex++);
return reader.read();
}
@AfterStep
public void afterStep(StepExecution stepExecution) {
LOGGER.info("Closing the reader...");
reader.close();
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if(executionContext.containsKey(CURRENT_INDEX)){
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
} else{
currentIndex = 0;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
@Override
public void close() throws ItemStreamException {
}
}
作业重启代码:
@Override
public void restartJob(Long jobId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
LOGGER.info("Restarting job with JobId: {}", jobId);
jobOperator.restart(jobId);
}
如果您需要我这边的任何代码,请告诉我。
解决方案
FlatFileItemReader
您的自定义阅读器 ( ) 中使用的委托阅读器 ( PostalReader
) 不遵守ItemStream
合同。您需要在项目阅读器open/update/close
的相应open/update/close
方法中调用委托阅读器。就像是:
public class PostalReader implements ItemReader<PostalDto>, ItemStream{
private FlatFileItemReader<PostalDto> reader;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
reader.open(executionContext);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
reader.update(executionContext);
}
@Override
public void close() throws ItemStreamException {
reader.close();
}
}
推荐阅读
- postgresql - PGAdmin 使用每个“docker-compose up”创建新的随机卷
- python - 如何使用正则表达式删除字符串中的数字,但主题标签中的数字除外
- php - 数据不会在第一次加载
- javascript - 查询 Google Place API 以获取对话流响应
- google-maps - 如何在谷歌地图中切换 gmaps.js setContextMenu 内容
- azure - Azure DevOps“变量组”中定义的数组变量可以吗
- react-native - 在项目“:app”中找不到路径为“../node_modules/react-native-code-push/android”的项目
- ios - 执行 po 命令时会调用什么方法
- javascript - 如何在google appscript中的多个html文件上设置默认主页并将其发布为WebApp
- java - while(true) 循环只执行一次