java - 为什么在春季批处理中不调用拆分流?
问题描述
我有以下工作配置:
@Bean
public Job job(Step databaseToDataBaseLowercaseSlaveStep) {
return jobBuilderFactory.get("myJob")
.incrementer(new RunIdIncrementer())
.flow(csvToDbLowercaseStep())
.next(databaseToDataBaseLowercaseSlaveStep)
.split(jobTaskExecutor())
.add(new FlowBuilder<Flow>("flow2")
.start(notificationStep())
.build()
)
.end()
.build();
}
预期的行动顺序:
- 执行
csvToDbLowercaseStep
- 并行运行 2 个步骤 a)
databaseToDataBaseLowercaseSlaveStep
b)notificationStep
动作的实际顺序:
- 执行
csvToDbLowercaseStep
- 执行
databaseToDataBaseLowercaseSlaveStep
所以notificationStep
根本没有被调用。为什么?我该如何纠正它?
更新
@Bean
public TaskExecutor jobTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// there are 21 sites currently hence we have 21 threads
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(25);
taskExecutor.setThreadGroupName("cust-job-exec-");
taskExecutor.setThreadNamePrefix("cust-job-exec-");
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
解决方案
不支持向状态链添加拆分,这是执行所需操作的正确方法(任务 1 同步,然后是任务 2 和 3 并行):
public Job job() {
final Flow masterFlow = new FlowBuilder<Flow>("flow1").start(step("step1")).build();
final Flow slaveFlow = new FlowBuilder<Flow>("flow2").split(new SimpleAsyncTaskExecutor())
.add(
new FlowBuilder<Flow>("flow2.1").start(step("step2.1")).build(),
new FlowBuilder<Flow>("flow2.2").start(step("step2.2")).build())
.build();
return (jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(masterFlow)
.next(slaveFlow)
.build())
.build();
}
private TaskletStep step(final String name) {
return stepBuilderFactory.get(name)
.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
System.out.println(name + " start");
Thread.sleep(1000);
System.out.println(name + " end");
return RepeatStatus.FINISHED;
})
.build();
}
step1 开始
step1 结束
step2.1 开始
step2.2 开始
step2.1 结束
step2.2 结束
希望这可以帮助。
更新
您的代码正在尝试将拆分添加到状态链中,并且根据 FlowBuilder.SplitBuilder 的文档,它根本不受支持。
* <em>Note:</em> Adding a split to a chain of states is not supported. For example, the following configuration
* is not supported. Instead, the configuration would need to create a flow3 that was the split flow and assemble
* them separately.
*
* <pre>
* // instead of this
* Flow complexFlow = new FlowBuilder<SimpleFlow>("ComplexParallelFlow")
* .start(flow1)
* .next(flow2)
* .split(new SimpleAsyncTaskExecutor())
* .add(flow3, flow4)
* .build();
*
* // do this
* Flow splitFlow = new FlowBuilder<SimpleFlow>("parallelFlow")
* .start(flow3)
* .split(new SimpleAsyncTaskExecutor())
* .add(flow4).build();
*
* Flow complexFlow = new FlowBuilder<SimpleFlow>("ComplexParallelFlow")
* .start(flow1)
* .next(flow2)
* .next(splitFlow)
* .build();
* </pre>
推荐阅读
- c++ - C中的IsDigit()将char读取为C中的数字
- android-studio - android stuido中的Unicode字符
- reactjs - 单击编辑按钮时Redux初始值为null
- c# - 如何将对象模型(其中包含对象模型数组)传递给 API 以作为参数插入
- ruby-on-rails - JSONB 整数数组包含值
- python - 当没有给出 url 参数时,如何在 Django 视图中将当前登录用户设置为默认用户
- python - 我如何在 Windows 10 上获取 ORACLE 11 g DATABASE 的 IP 地址以远程连接到它?
- kotlin - 过滤列表然后更改类型
- php - 将 12 个月添加到 2 月 29 日
- c# - CrystalDecisions.Shared.CrystalReportsException:加载报告失败。文件未打开