首页 > 解决方案 > org.springframework.batch.core.JobExecutionException:分区处理程序返回一个不成功的步骤

问题描述

我正在学习 Spring Batch,我能够创建简单的单步应用程序(github repo 链接

此应用程序包含以下工作:
1. 从 csv 文件中读取人员
2. 小写他们的姓名
3. 将他们保存到数据库 中

现在我想学习分区功能,所以我添加了以下分区器:

@Component
public class MyPartitioner implements Partitioner {

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        for (int k = 0; k < gridSize; k++) {
            ExecutionContext context = new ExecutionContext();
            context.putString("keyName", "key_" + k); //Depends on what logic you want to use to split
            map.put("PARTITION_KEY" + k, context);
        }
        return map;
    }
}

我的配置如下所示:

@Bean
public Job job() {
    return jobBuilderFactory.get("myJob")
            .incrementer(new RunIdIncrementer())
            .flow(demoPartitionStep())
            .end()
            .build();
}

private Step demoPartitionStep() {
    return stepBuilderFactory.get("demoPartitionStep")
            .partitioner("demoPartitionStep", myPartitioner)
            .gridSize(21)
            .step(csvToDataBaseStep())
            .taskExecutor(jobTaskExecutor())
            .build();
}

private Step csvToDataBaseStep() {
    return stepBuilderFactory.get("csvToDatabaseStep")
            .<Person, Person>chunk(10)
            .reader(csvPersonReader())
            .processor(toLowerCasePersonProcessor)
            .writer(dbPersonWriter)
            .build();

}

public FlatFileItemReader csvPersonReader() {
    return new FlatFileItemReaderBuilder()
            .name("csvPersonReader")
            .resource(csvResource)
            .delimited()
            .names(new String[]{"firstName", "lastName"})
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }})
            .build();

}

@Bean
public TaskExecutor jobTaskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    // there are 21 sites currently hence we have 21 threads
    taskExecutor.setMaxPoolSize(30);
    taskExecutor.setCorePoolSize(25);
    taskExecutor.setThreadGroupName("custom-executor");
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

当我启动应用程序时,我在日志中看到以下内容:

2019-08-05 19:25:22.303 ERROR 24100 --- [bTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Encountered an error executing step csvToDatabaseStep in job myJob

org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [class path resource [users.csv]]
    at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.io.IOException: Stream closed
    at java.io.BufferedReader.ensureOpen(BufferedReader.java:122) ~[na:1.8.0_111]
    at java.io.BufferedReader.readLine(BufferedReader.java:317) ~[na:1.8.0_111]
    at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_111]
    at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    ... 26 common frames omitted

和以下:

2019-08-05 19:25:22.319 ERROR 24100 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step demoPartitionStep in job myJob

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at com.sun.proxy.$Proxy77.run(Unknown Source) [na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:206) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:180) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:167) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:162) [spring-boot-autoconfigure-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:779) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:763) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:318) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at spring.boot.hello.world.MyApplication.main(MyApplication.java:9) [main/:na]

我注意到一些数据已成功插入数据库。我的 csv 文件包含 500 人,但在 sql 数据库中可能插入 210 行或 332 甚至 600 行(我在将块大小设置为 100 时看到它)

如何正确实施分区?我错了什么?

更新

我试图用标记 csvPersonReader@StepScope错误消失但是

rowCountInDatabaseTable=gridSize * rowCountInCsvFile

我仍在寻找解决方案

标签: javaspring-bootspring-batch

解决方案


实际上,我自己并没有使用过partioner,但我也许可以给你一些提示。

您遇到的第一个异常(流已关闭)是因为每个从属进程都使用了相同的读取器实例。显然,第一个完成的从属进程关闭了阅读器,从那一刻起,其他从属进程试图从一个关闭的流中读取。

您使用 StepScope-Annotation 解决了这个问题,这是解决此问题的正确方法。

分区方法的问题在于,您负责对读取的数据进行分区。

您所做的只是创建一个具有 20 个从属进程的分区程序,并且这些从属进程中的每个人都读取整个文件。因此,您的数据库包含文件中的每个条目 20 次。

您应该做的是使用适当的“上下文属性”配置每个步骤实例。基于这些上下文属性,步骤实例知道它应该处理哪些行(例如开始行和结束行)。或者您也可以将原始文件拆分为 20 个具有不同名称的文件,并为每个实例提供另一个文件名。

我找到了两个解释这一点的例子,一个是从数据库中读取的,一个是从文件中读取的:

https://www.mkyong.com/spring-batch/spring-batch-partitioning-example/

https://www.baeldung.com/spring-batch-partitioner


推荐阅读