spring - spring cloud stream文件源码app - 子目录下已处理文件和轮询文件的历史
问题描述
我在管道开始时使用 Spring Cloud Stream File Source 应用程序构建数据管道。我需要一些帮助来解决一些缺失的功能
我的文件源应用程序(基于 org.springframework.cloud.stream.app:spring-cloud-starter-stream-source-file)运行良好,除了缺少我需要帮助的功能。我需要
- 轮询和发送消息后删除文件
- 轮询子目录
关于第 1 项,我读到文件源应用程序中不存在删除功能(它在 sftp 源中可用)。每次重启应用都会重新拾取之前处理过的文件,是否可以将处理过的文件历史永久化?有没有简单的替代方案?
解决方案
为了支持这些要求,您肯定需要修改上述文件源项目的代码:https ://docs.spring.io/spring-cloud-stream-app-starters/docs/Einstein.BUILD-SNAPSHOT/reference/htmlsingle/ #_patching_pre_built_applications
我建议分叉项目并按原样从 GitHub 轮询它,因为您要修改项目的现有代码。然后,您按照上述文档中的说明如何构建与 SCDF 环境兼容的目标绑定器特定工件。
现在关于问题:
要轮询相同文件模式的子目录,您需要RecursiveDirectoryScanner
在Files.inboundAdapter()
:
/**
* Specify a custom scanner.
* @param scanner the scanner.
* @return the spec.
* @see FileReadingMessageSource#setScanner(DirectoryScanner)
*/
public FileInboundChannelAdapterSpec scanner(DirectoryScanner scanner) {
请注意,所有的都filters
必须在此配置DirectoryScanner
。否则会有警告:
// Check that the filter and locker options are _NOT_ set if an external scanner has been set.
// The external scanner is responsible for the filter and locker options in that case.
Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)),
() -> "When using an external scanner the 'filter' and 'locker' options should not be used. " +
"Instead, set these options on the external DirectoryScanner: " + this.scanner);
要跟踪文件,最好考虑使用FileSystemPersistentAcceptOnceFileListFilter
基于外部持久性存储的ConcurrentMetadataStore
实现:https ://docs.spring.io/spring-integration/reference/html/#metadata-store 。必须使用 this 而不是 that preventDuplicates()
,因为也FileSystemPersistentAcceptOnceFileListFilter
确保我们只使用一次逻辑。
发送后删除文件可能不是这种情况,因为您可以File
按原样发送,并且它必须在另一端可用。
此外,您可以将 a 添加ChannelInterceptor
到source.output()
并实现其postSend()
to perform ((File) message.getPayload()).delete()
,这将在消息成功发送到活页夹目的地时发生。
推荐阅读
- android - 多个片段的一个 ViewModel 实例
- sap-cloud-sdk - 租户特定的弹性配置
- android - 主要要求是将 react-native-android 构建转换为 .aar(Android Archive) 库以在 Cordova 中重用
- autosar - 如何在 Vector AUTOSAR 配置工具中为 OS_Task 配置 MPU(内存保护单元)?
- ios - 如何在父视图中将项目添加到全局列表 var initted/wrapped 在 ObservableObject 中并让它在子视图中反映在 SwiftUI 中的 ForEach 中?
- django - 值错误:字段 'id' 需要一个数字,但得到了 'favicon.ico
- javascript - 为什么动画不适用于所有类元素?
- python - 让我的 Discord 机器人响应某个人
- flutter - 任务':audioplayers:compileDebugKotlin'的Flutter执行失败
- mysql - 求和类似值 MySQL 后每种类型的最大值