spring-integration - 如何在单个线程中执行一系列步骤,并在弹簧集成中使用异步流程?
问题描述
我目前有一个弹簧集成(v4.3.24)流程,如下所示:
|
| list of
| filepaths
+----v---+
|splitter|
+----+---+
| filepath
|
+----------v----------+
|sftp-outbound-gateway|
| "get" |
+----------+----------+
| file
+---------------------+
| +----v----+ |
| |decryptor| |
| +----+----+ |
| | |
| +-----v------+ | set of transformers
| |decompressor| | (with routers before them
| +-----+------+ | because some steps are optional)
| | | that process the file;
| +--v--+ | call this "FileProcessor"
| | ... | |
| +--+--+ |
+---------------------+
|
+----v----+
|save file|
| to disk |
+----+----+
|
上面所有的频道都是DirectChannel
s - 是的,我知道这是一个糟糕的结构。这对于少量文件来说效果很好。但是现在,我必须处理需要经过相同流程的数千个文件 - 基准测试表明这需要大约 1 天才能完成处理。所以,我打算在这个流程中引入一些并行处理。我想修改我的流程以实现以下目标:
|
|
+----------v----------+
|sftp-outbound-gateway|
| "mget" |
+----------+----------+
| list of files
|
+----v---+
|splitter|
+----+---+
one thread one | thread ...
+------------------------+---------------+--+--+--+--+
| file | file | | | | |
+---------------------+ +---------------------+
| +----v----+ | | +----v----+ |
| |decryptor| | | |decryptor| |
| +----+----+ | | +----+----+ |
| | | | | |
| +-----v------+ | | +-----v------+ | ...
| |decompressor| | | |decompressor| |
| +-----+------+ | | +-----+------+ |
| | | | | |
| +--v--+ | | +--v--+ |
| | ... | | | | ... | |
| +--+--+ | | +--+--+ |
+---------------------+ +---------------------+
| |
+----v----+ +----v----+
|save file| |save file|
| to disk | | to disk |
+----+----+ +----+----+
| |
| |
对于并行处理,我将文件从拆分器输出到ExecutorChannel
带有ThreadPoolTaskExecutor
.
我的一些问题:
我希望一个文件的所有“FileProcessor”步骤发生在同一个线程上,同时并行处理多个文件。我怎样才能做到这一点?
我从这个答案中看到ExecutorChannel
,MessageHandlerChain
流将提供这样的功能。但是,“FileProcessor”中的一些步骤是可选的(selector-expression
与路由器一起使用以跳过一些步骤) - 排除使用MessageHandlerChain
. 我可以在 s 里面安装几个MessageHandlerChain
sFilter
,但这或多或少成为 #2 中提到的方法。如果无法实现#1,是否会更改从拆分器开始的所有通道类型,
DirectChannel
以ExecutorChannel
帮助引入一些并行性?如果是,我应该TaskExecutor
为每个通道创建一个新的,还是可以为所有通道重用一个TaskExecutor
bean(我不能scope="prototype"
在TaskExecutor
bean 上设置)?在您看来,哪种方法(#1 或 #2)更好?为什么?
如果我执行全局错误处理,就像这里提到的方法一样,即使一个文件出错,其他文件是否会继续处理?
解决方案
ExecutorChannel
通过使用 an作为解密器的输入并将所有其余部分保留为直接通道,它将根据您的需要工作;剩余的流程不必是一个链,每个组件将在执行器的一个线程上运行。
您需要确保所有下游组件都是线程安全的。
错误处理应保持原样;每个子流程都是独立的。
推荐阅读
- javascript - 如何使用javascript将参数传递给目标url
- reactjs - 如何在 Ag-grid 中按 ObjectID (_id) 过滤记录
- arrays - 如何从具有多个维度的numpy数组中删除重复项
- node.js - 如何在 docker CMD 命令中运行多个 npm 脚本
- c# - 模型属性更改时的 WPF 调用方法
- macos - 编写脚本以删除目录的内容
- php - 模型内的条件关系
- java - 有没有办法只允许一个范围内的函数的 Int 参数
- javascript - 如何处理多个事件以动画 DOM 中的同一元素?
- node.js - 如何在整个应用程序中使用会话变量