首页 > 解决方案 > Spring Batch:使用 AsyncItemProcessor 的多线程步骤不会并行运行

问题描述

TL;博士

给定一个包含一百万个记录的文件,其中在文件的每一行上要执行大量逻辑,读取文件并在每一行上完成应用逻辑的最快方法是什么。我使用了文件阅读器的多线程步骤,其read方法是synchronized读取文件,并且还使用了一个AsynItemProcessor以便在其自己的线程中处理记录。

我的期望是,AsynItemProcessor一旦有读者的记录要处理,就应该立即开始。每条记录都应该在自己的线程中处理;但是,在我下面的示例中似乎并非如此


我的 Spring 批处理作业中有一个步骤,它使用TaskExecutor20 个线程和 10000 的提交间隔来读取文件。我也使用AsycnItemProcessorandAsyncItemWriter因为数据处理有时会比从文件中读取一行所需的时间长。

<step id="aggregationStep">
    <tasklet throttle-limit="20" task-executor="taskExecutor">
        <chunk reader="fileReader"
            processor="asyncProcessor" writer="asyncWriter"
            commit-interval="10000" />
    </tasklet>
</step>

在哪里 :

  1. fileReader是一个扩展的类,FlatFileItemReader方法readsynchronized并且只是在其中调用super.read
  2. asyncProcessor只不过是一个AsyncItemProcessorbean,它从文件中传递每一行并通过一个键对其进行分组,并将其存储到一个包含Map<String,BigDecimal>对象的单例 bean 中。换句话说,处理器只是简单地将文件数据按几列分组并将这些数据存储在内存中。
  3. asyncWriter只不过是在AsyncItemWriter其中包装了 no 操作的 an ItemWriter。换句话说,该作业不需要进行任何类型的写入,因为处理器本身正在执行聚合并将数据存储在内存中。( Map)。
  4. 请注意, theAsynItemProcessor有它自己的ThreadPoolTaskExecutorcorePoolSize=10andmaxPoolSize=20和 theStep有它自己的ThreadPoolTaskExecutorwith a corePoolSize=20andmaxPoolSize=40

通过上述设置,我的预期是读取和处理将并行发生。就像是 :

  1. FileReader 从文件中读取一条记录并将其传递给处理器
  2. AsyncItemProcessor执行聚合。既然是AsyncItemProcessor,那么调用该process方法的线程理想情况下应该可以自由地做其他工作吗?
  3. 最后,AsynItemWriter将获取Future并提取数据但什么也不做,因为委托是无操作的ItemWriter

但是当我添加一些日志时,我没有看到我所期望的:

2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 2500 records 2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 5000 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 7501 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 10000 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 12500 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 15000 records

... 更多这样的行被打印,直到整个文件被读取。只有在读取文件后,我才开始看到处理器开始工作:

2019-09-07 10:06:53 INFO FileProcessor:83 - Finished processing 2500 records 2019-09-07 10:08:28 INFO FileProcessor:83 - Finished processing 5000 records 2019-09-07 10:10:04 INFO FileProcessor:83 - Finished processing 7500 records 2019-09-07 10:11:40 INFO FileProcessor:83 - Finished processing 10000 records 2019-09-07 10:13:16 INFO FileProcessor:83 - Finished processing 12500 records 2019-09-07 10:14:51 INFO FileProcessor:83 - Finished processing 15000 records

底线:为什么在文件完全读取之前处理器不启动?无论ThreadPoolTaskExecutor用于AsynItemProcessor或 整个的参数是什么step,读取总是首先完成,然后才开始处理。

标签: javaspringspring-batchthreadpoolexecutoritemprocessor

解决方案


这就是面向块的处理的工作原理。该步骤将读取变量中的 X 项(其中 X 是提交间隔),然后执行处理/写入。你可以在代码中ChunkOrientedTasklet看到。

在多线程步骤中,每个块将由一个线程处理。


推荐阅读