首页 > 解决方案 > spring cloud stream文件源码app - 子目录下已处理文件和轮询文件的历史

问题描述

我在管道开始时使用 Spring Cloud Stream File Source 应用程序构建数据管道。我需要一些帮助来解决一些缺失的功能

我的文件源应用程序(基于 org.springframework.cloud.stream.app:spring-cloud-starter-stream-source-file)运行良好,除了缺少我需要帮助的功能。我需要

  1. 轮询和发送消息后删除文件
  2. 轮询子目录

关于第 1 项,我读到文件源应用程序中不存在删除功能(它在 sftp 源中可用)。每次重启应用都会重新拾取之前处理过的文件,是否可以将处理过的文件历史永久化?有没有简单的替代方案?

标签: springspring-cloud-stream

解决方案


为了支持这些要求,您肯定需要修改上述文件源项目的代码:https ://docs.spring.io/spring-cloud-stream-app-starters/docs/Einstein.BUILD-SNAPSHOT/reference/htmlsingle/ #_patching_pre_built_applications

我建议分叉项目并按原样从 GitHub 轮询它,因为您要修改项目的现有代码。然后,您按照上述文档中的说明如何构建与 SCDF 环境兼容的目标绑定器特定工件。

现在关于问题:

要轮询相同文件模式的子目录,您需要RecursiveDirectoryScannerFiles.inboundAdapter():

/**
 * Specify a custom scanner.
 * @param scanner the scanner.
 * @return the spec.
 * @see FileReadingMessageSource#setScanner(DirectoryScanner)
 */
public FileInboundChannelAdapterSpec scanner(DirectoryScanner scanner) {

请注意,所有的都filters必须在此配置DirectoryScanner。否则会有警告:

    // Check that the filter and locker options are _NOT_ set if an external scanner has been set.
    // The external scanner is responsible for the filter and locker options in that case.
    Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)),
            () -> "When using an external scanner the 'filter' and 'locker' options should not be used. " +
                    "Instead, set these options on the external DirectoryScanner: " + this.scanner);

要跟踪文件,最好考虑使用FileSystemPersistentAcceptOnceFileListFilter基于外部持久性存储的ConcurrentMetadataStore实现:https ://docs.spring.io/spring-integration/reference/html/#metadata-store 。必须使用 this 而不是 that preventDuplicates(),因为也FileSystemPersistentAcceptOnceFileListFilter确保我们只使用一次逻辑。

发送后删除文件可能不是这种情况,因为您可以File按原样发送,并且它必须在另一端可用。

此外,您可以将 a 添加ChannelInterceptorsource.output()并实现其postSend()to perform ((File) message.getPayload()).delete(),这将在消息成功发送到活页夹目的地时发生。


推荐阅读