spring-integration - 使用 Spring Integration 从远程 SFTP 目录和子目录流式传输
问题描述
我正在使用 Spring Integration Streaming Inbound Channel Adapter,从远程 SFTP 获取流并解析每一行内容过程。
我用 :
IntegrationFlows.from(Sftp.inboundStreamingAdapter(template)
.filter(remoteFileFilter)
.remoteDirectory("test_dir"),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(fetchInt)))
.handle(Files.splitter(true, true))
....
它现在可以工作了。但是我只能从test_dir
目录中获取文件,但是我需要递归地从这个目录和子目录中获取文件并解析每一行。
我注意到Inbound Channel Adapter
which 是Sftp.inboundAdapter(sftpSessionFactory).scanner(...)
. 它可以扫描子目录。但我没有看到任何东西Streaming Inbound Channel Adapter
。
那么,我怎样才能实现“从目录中递归获取文件”Streaming Inbound Channel Adapter
呢?
谢谢。
解决方案
你可以使用两个出站网关——第一个做ls -R
(递归列表);拆分结果并使用配置的网关mget -stream
来获取每个文件。
编辑
@SpringBootApplication
public class So60987851Application {
public static void main(String[] args) {
SpringApplication.run(So60987851Application.class, args);
}
@Bean
IntegrationFlow flow(SessionFactory<LsEntry> csf) {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5_000)))
.handle(Sftp.outboundGateway(csf, Command.LS, "payload")
.options(Option.RECURSIVE, Option.NAME_ONLY)
// need a more robust metadata store for persistence, unless the files are removed
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
.split()
.log()
.enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'foo/' + payload"))
.handle(Sftp.outboundGateway(csf, Command.GET, "'foo/' + payload")
.options(Option.STREAM))
.split(new FileSplitter())
.log()
// instead of a filter, we can remove the remote file.
// but needs some logic to wait until all lines read
// .handle(Sftp.outboundGateway(csf, Command.RM, "headers['fileToRemove']"))
// .log()
.get();
}
@Bean
CachingSessionFactory<LsEntry> csf(DefaultSftpSessionFactory sf) {
return new CachingSessionFactory<>(sf);
}
@Bean
DefaultSftpSessionFactory sf() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("10.0.0.8");
sf.setUser("gpr");
sf.setPrivateKey(new FileSystemResource(new File("/Users/grussell/.ssh/id_rsa")));
sf.setAllowUnknownKeys(true);
return sf;
}
}
推荐阅读
- spring-boot - spring-boot中的ehcache服务器同步
- reference - 在 Kotlin 中将 ArrayList 转换为 List 时的不变性问题
- ios - iOS Unity Cloud Build + Firebase 数据库
- php - 从 wordpress 帖子中获取全尺寸图片
- html - HTML5/CSS3 - 左边距问题
- azure - Azure 文件共享和软链接
- hive - Hive Join 卡在 Map 阶段
- sql - Sql查询选择同一日期的数据
- azure-active-directory - 通过 REST API 调整与 Intune 相关的 RBAC 资源
- python - 大熊猫合并两列被视为“集合”