首页 > 解决方案 > 如何在单个线程中执行一系列步骤,并在弹簧集成中使用异步流程?

问题描述

我目前有一个弹簧集成(v4.3.24)流程,如下所示:

           |
           | list of
           | filepaths
      +----v---+
      |splitter|
      +----+---+
           | filepath
           |
+----------v----------+
|sftp-outbound-gateway|
|        "get"        |
+----------+----------+
           | file
+---------------------+
|     +----v----+     |
|     |decryptor|     |
|     +----+----+     |
|          |          |
|    +-----v------+   | set of transformers
|    |decompressor|   | (with routers before them
|    +-----+------+   | because some steps are optional)
|          |          | that process the file;
|       +--v--+       | call this "FileProcessor"
|       | ... |       |
|       +--+--+       |
+---------------------+
           |
      +----v----+
      |save file|
      | to disk |
      +----+----+
           |

上面所有的频道都是DirectChannels - 是的,我知道这是一个糟糕的结构。这对于少量文件来说效果很好。但是现在,我必须处理需要经过相同流程的数千个文件 - 基准测试表明这需要大约 1 天才能完成处理。所以,我打算在这个流程中引入一些并行处理。我想修改我的流程以实现以下目标:

                                    |
                                    |
                         +----------v----------+
                         |sftp-outbound-gateway|
                         |       "mget"        |
                         +----------+----------+
                                    | list of files
                                    |
                               +----v---+
                               |splitter|
                               +----+---+
         one thread             one | thread        ...
           +------------------------+---------------+--+--+--+--+
           | file                   | file          |  |  |  |  |
+---------------------+  +---------------------+
|     +----v----+     |  |     +----v----+     |
|     |decryptor|     |  |     |decryptor|     |
|     +----+----+     |  |     +----+----+     |
|          |          |  |          |          |
|    +-----v------+   |  |    +-----v------+   |   ...
|    |decompressor|   |  |    |decompressor|   |
|    +-----+------+   |  |    +-----+------+   |
|          |          |  |          |          |
|       +--v--+       |  |       +--v--+       |
|       | ... |       |  |       | ... |       |
|       +--+--+       |  |       +--+--+       |
+---------------------+  +---------------------+
           |                        |
      +----v----+              +----v----+
      |save file|              |save file|
      | to disk |              | to disk |
      +----+----+              +----+----+
           |                        |
           |                        |

对于并行处理,我将文件从拆分器输出到ExecutorChannel带有ThreadPoolTaskExecutor.

我的一些问题:

  1. 我希望一个文件的所有“FileProcessor”步骤发生在同一个线程上,同时并行处理多个文件。我怎样才能做到这一点?
    我从这个答案中看到ExecutorChannelMessageHandlerChain流将提供这样的功能。但是,“FileProcessor”中的一些步骤是可选的(selector-expression与路由器一起使用以跳过一些步骤) - 排除使用MessageHandlerChain. 我可以在 s 里面安装几个MessageHandlerChains Filter,但这或多或少成为 #2 中提到的方法。

  2. 如果无法实现#1,是否会更改从拆分器开始的所有通道类型,DirectChannelExecutorChannel帮助引入一些并行性?如果是,我应该TaskExecutor为每个通道创建一个新的,还是可以为所有通道重用一个TaskExecutorbean(我不能scope="prototype"TaskExecutorbean 上设置)?

  3. 在您看来,哪种方法(#1 或 #2)更好?为什么?

  4. 如果我执行全局错误处理,就像这里提到的方法一样,即使一个文件出错,其他文件是否会继续处理?

标签: spring-integration

解决方案


ExecutorChannel通过使用 an作为解密器的输入并将所有其余部分保留为直接通道,它将根据您的需要工作;剩余的流程不必是一个链,每个组件将在执行器的一个线程上运行。

您需要确保所有下游组件都是线程安全的。

错误处理应保持原样;每个子流程都是独立的。


推荐阅读