java - 使用动态参数弹簧批处理在foreach循环中运行作业
问题描述
我用 spring boot 创建了一个 spring 批处理作业。我定制了 Reader 以从 REST API 获取 json 数据并将数据转换为 java 对象,而 Writer 会将数据推送到队列中。我在 foreach 循环中调用我的工作来设置参数并向具有不同语言的 REST API 发送请求。对于第一次迭代,我的工作成功运行,但对于其他迭代,它只显示它已经完成。
批量配置:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public RestWebClient webClient;
@Bean
public ItemReader<Code> reader() {
return new CodeAndLabelRestItemReader(webClient);
}
@Bean
public CodeAndLabelItemProcessor processor() {
return new CodeAndLabelItemProcessor("France","DP","transaction");
}
@Bean
public ItemWriter<CodeAndLabel> calWriter(AmqpTemplate amqpTemplate) {
return new CodeAndLabelItemWriter(amqpTemplate);
}
@Bean(name = "importJob")
public Job importCodesAndLabelsJob(JobCompletionNotificationListener listener, Step stepJms) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(stepJms)
.end()
.build();
}
@Bean
public Step stepJms(ItemWriter<CodeAndLabel> writer) {
return stepBuilderFactory.get("stepJms")
.<Code, CodeAndLabel>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
读者:
public class CodeAndLabelRestItemReader implements ItemReader<Code>{
private final RestWebClient webClient;
private int nextCodeIndex;
private List<Code> codes;
public CodeAndLabelRestItemReader(RestWebClient webClient) {
this.webClient = webClient;
nextCodeIndex = 0;
}
@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
JobParameters jobParameters = stepExecution.getJobParameters();
this.webClient.setEndPointSuffix(jobParameters.getString("endPointSuffix"));
}
@Override
public Code read() {
if(codesAndLabelsListNotInitialized()) {
codes = webClient.getCodes();
}
Code nextCode = null;
if (nextCodeIndex < codes.size()) {
nextCode = codes.get(nextCodeIndex);
nextCodeIndex++;
}
return nextCode;
}
private boolean codesAndLabelsListNotInitialized() {
return this.codes == null;
}
}
处理器:
public class CodeAndLabelItemProcessor implements ItemProcessor<Code, CodeAndLabel> {
private String populationId;
private String populationDataProvider;
private String transactionId;
public CodeAndLabelItemProcessor(String populationId, String populationDataProvider, String transactionId) {
this.populationId = populationId;
this.populationDataProvider = populationDataProvider;
this.transactionId = transactionId;
}
@Override
public CodeAndLabel process(Code code) throws Exception {
CodeAndLabel codeAndLabel = new CodeAndLabel();
codeAndLabel.setUid(code.getUid());
System.out.println("Converting (" + code + ") into (" + codeAndLabel + ")");
return codeAndLabel;
}
}
作家:
public class CodeAndLabelItemWriter implements ItemWriter<CodeAndLabel>{
private AmqpTemplate template;
public CodeAndLabelItemWriter(AmqpTemplate template) {
this.template = template;
}
@Override
public void write(List<? extends CodeAndLabel> items) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Writing to RabbitMQ with " + items.size() + " items."); }
for(CodeAndLabel item : items) {
template.convertAndSend(BatchConfiguration.topicExchangeName,"com.batchprocessing.queue",item);
System.out.println("item : "+item);
}
}
监听器:
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
System.out.println("JOB FINISHED");
}
}
}
和运行这项工作的班级:
@Component
public class Initialization {
// some code here
String[] languages = processLanguage.split(";");
for(String language : languages) {
JobParameters params = new JobParametersBuilder()
.addString("JobID",String.valueOf(System.currentTimeMillis()))
.addString("endPointSuffix",
"/codeAndLabel".concat(language.toUpperCase()))
.toJobParameters();
jobLauncher.run(job, params);
}
输出:第一次迭代:
Converting (WFR.SP.2C) into (WFR.SP.2C)
Converting (WFR.SP.3E) into (WFR.SP.3E)
Converting (WFR.SP.FC) into (WFR.SP.FC)
Converting (WFR.SP.FD) into (WFR.SP.FD)
Converting (WFR.SP.FI) into (WFR.SP.FI)
Converting (WFR.SP.FM) into (WFR.SP.FM)
item : WFR.SP.2C
item : WFR.SP.3E
item : WFR.SP.FC
item : WFR.SP.FD
item : WFR.SP.FI
item : WFR.SP.FM
JOB FINISHED
第二次迭代
JOB FINISHED
我认为在第二次迭代中,作业没有运行 Reader 处理器和 writer bean,我不知道为什么。任何人都可以提供一些帮助吗?
解决方案
推荐阅读
- windows - 将组策略明确定义为“No One”——有风险吗?
- python - 如何在python的数组中获取特定数字的坐标?
- reactjs - 使用 Sinon 存根 React useState
- c++ - 为什么文件名会影响 C++ 中的结构?
- node.js - 正则表达式替换值
- audiokit - 一起使用 AKNodeOutputPlot 和 AKNodeRecorder 时崩溃
- informix - 如何从给定的日期时间获取一天结束
- python - 我从上一个问题中找到的用于连接 CSV 文件的细分代码?
- java - 想要在 App 中以编程方式删除文件
- php - Windows/temp 文件夹中的 Phpxxxx.tmp 文件没有被删除