首页 > 解决方案 > 使用动态参数弹簧批处理在foreach循环中运行作业

问题描述

我用 spring boot 创建了一个 spring 批处理作业。我定制了 Reader 以从 REST API 获取 json 数据并将数据转换为 java 对象,而 Writer 会将数据推送到队列中。我在 foreach 循环中调用我的工作来设置参数并向具有不同语言的 REST API 发送请求。对于第一次迭代,我的工作成功运行,但对于其他迭代,它只显示它已经完成。

批量配置:

@Configuration
   @EnableBatchProcessing
   public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory; 
         
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public RestWebClient webClient;
    
    @Bean
    public ItemReader<Code> reader() {
        return new CodeAndLabelRestItemReader(webClient);
    }
    
    @Bean
    public CodeAndLabelItemProcessor processor() {
        return new CodeAndLabelItemProcessor("France","DP","transaction");
    }
    
    @Bean
    public ItemWriter<CodeAndLabel> calWriter(AmqpTemplate amqpTemplate) {
        
        return new CodeAndLabelItemWriter(amqpTemplate);             
    }
    
    @Bean(name = "importJob")
    public Job importCodesAndLabelsJob(JobCompletionNotificationListener listener, Step stepJms) {
        return jobBuilderFactory.get("importJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(stepJms)
                .end()
                .build();
    }
    
    @Bean
    public Step stepJms(ItemWriter<CodeAndLabel> writer) {
        return stepBuilderFactory.get("stepJms")
                .<Code, CodeAndLabel>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

读者:

public class CodeAndLabelRestItemReader implements ItemReader<Code>{

private final RestWebClient webClient;

private int nextCodeIndex;
private List<Code> codes;


public CodeAndLabelRestItemReader(RestWebClient webClient) {
    this.webClient = webClient;     
    nextCodeIndex = 0;
}

@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
    JobParameters jobParameters = stepExecution.getJobParameters();
    this.webClient.setEndPointSuffix(jobParameters.getString("endPointSuffix"));
}

@Override
public Code read()  {
    if(codesAndLabelsListNotInitialized()) {
        codes = webClient.getCodes();
    }
    
    Code nextCode = null;
    
    if (nextCodeIndex < codes.size()) {
        nextCode = codes.get(nextCodeIndex);
        nextCodeIndex++;
    }

    return nextCode;
}

private boolean codesAndLabelsListNotInitialized() {
    return this.codes == null;
}

}

处理器:

public class CodeAndLabelItemProcessor implements ItemProcessor<Code, CodeAndLabel> {

private String populationId;
private String populationDataProvider;
private String transactionId;

public CodeAndLabelItemProcessor(String populationId, String populationDataProvider, String transactionId) {
    this.populationId = populationId;
    this.populationDataProvider = populationDataProvider;
    this.transactionId = transactionId; 
}

@Override
public CodeAndLabel process(Code code) throws Exception {

    CodeAndLabel codeAndLabel = new CodeAndLabel();
    
    codeAndLabel.setUid(code.getUid());
    
    
    System.out.println("Converting (" + code + ") into (" + codeAndLabel + ")");
    
    return codeAndLabel;
}

}

作家:

public class CodeAndLabelItemWriter implements ItemWriter<CodeAndLabel>{

private AmqpTemplate template;

public CodeAndLabelItemWriter(AmqpTemplate template) {
    this.template = template;
}

@Override
public void write(List<? extends CodeAndLabel> items) throws Exception {
    
    if (log.isDebugEnabled()) {
        
        log.debug("Writing to RabbitMQ with " + items.size() + " items."); }
      
    for(CodeAndLabel item : items) {
        template.convertAndSend(BatchConfiguration.topicExchangeName,"com.batchprocessing.queue",item);
        System.out.println("item : "+item);
    }
    
}

监听器:

@Component
 public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
        System.out.println("JOB FINISHED");

    }
}

}

和运行这项工作的班级:

 @Component
 public class Initialization {
 // some code here
   String[] languages = processLanguage.split(";");             
                for(String language : languages) {

                    JobParameters params = new JobParametersBuilder() 
                              .addString("JobID",String.valueOf(System.currentTimeMillis()))                        
                              .addString("endPointSuffix", 
     "/codeAndLabel".concat(language.toUpperCase()))
                              .toJobParameters();
                      jobLauncher.run(job, params);
                    
                }

输出:第一次迭代:

 Converting (WFR.SP.2C) into (WFR.SP.2C)
Converting (WFR.SP.3E) into (WFR.SP.3E)
Converting (WFR.SP.FC) into (WFR.SP.FC)
Converting (WFR.SP.FD) into (WFR.SP.FD)
Converting (WFR.SP.FI) into (WFR.SP.FI)
Converting (WFR.SP.FM) into (WFR.SP.FM)
item : WFR.SP.2C
item : WFR.SP.3E
item : WFR.SP.FC 
item : WFR.SP.FD
item : WFR.SP.FI
item : WFR.SP.FM
JOB FINISHED

第二次迭代

JOB FINISHED

我认为在第二次迭代中,作业没有运行 Reader 处理器和 writer bean,我不知道为什么。任何人都可以提供一些帮助吗?

标签: javaspringspring-bootspring-batchjobs

解决方案


推荐阅读