首页 > 解决方案 > Spring Batch 非法状态(仅在竞争条件下发生):作业执行已在运行

问题描述

在我的 Spring Batch 应用程序中,我使用 PostgreSQL 作为作业存储库和以下登录以重新启动未完成的作业:

try {
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) {

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) {

        jobExecution.setStatus(BatchStatus.STOPPED);
        jobExecution.setEndTime(new Date());
        jobRepository.update(jobExecution);

        Long jobExecutionId = jobExecution.getId();
        jobOperator.restart(jobExecutionId);
    }
    }
} catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
}

但是这个逻辑失败了,但有以下例外:

2018-08-01 14:33:21.777 错误 32306 --- [main] cvpdservice.batch.BatchServiceImpl:非法状态(仅在竞争条件下发生):作业执行已经在运行,名称=documetPipelineJob 和参数=

这里可能有什么问题以及如何解决?

更新

看起来jobRepository.update(jobExecution);没有提交对数据库的更改。如何正确提交对数据库的更改?顺便说一句 - 这个逻辑适用于 H2 内存数据库。

标签: springspring-bootspring-batch

解决方案


为了解决这个问题,我添加了incrementer(new RunIdIncrementer())

jobBuilderFactory.get("documetPipelineJob")
        .incrementer(new RunIdIncrementer())
        .start(initStep)

我还扩展了重启逻辑以停止运行步骤:

try {
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) {

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) {

        Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
        for (StepExecution stepExecution : stepExecutions) {
        BatchStatus status = stepExecution.getStatus();
        if (status.isRunning() || status == BatchStatus.STOPPING) {
            stepExecution.setStatus(BatchStatus.STOPPED);
            stepExecution.setEndTime(new Date());
            jobRepository.update(stepExecution);
        }
        }

        jobExecution.setStatus(BatchStatus.STOPPED);
        jobExecution.setEndTime(new Date());
        jobRepository.update(jobExecution);

        Long jobExecutionId = jobExecution.getId();

        jobOperator.restart(jobExecutionId);
    }
    }
} catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
}

然后,我遇到了以下文章中描述的问题:序列化事务问题我同时运行了几个批处理作业。

JobRepository我通过配置来修复它ISOLATION_READ_UNCOMMITTED

@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_READ_UNCOMMITTED");
    factory.afterPropertiesSet();
    return factory.getObject();
}

推荐阅读