首页 > 解决方案 > DB关闭时如何将任何下载的文件路由到错误通道以及DB启动后如何处理它们?

问题描述

我已经在我的springboot应用程序中实现了轮询。此代码定期检查远程目录并从远程目录下载任何新文件。下载文件后,我们处理文件并将数据保存在数据库中。假设数据库已关闭我处理的文件数据未保存在 DB 中。每当 DB 再次启动时,我的文件不会被拾取进行处理,因此文件数据不会被保存。我在网上读到,如果我能够路由无法保存数据的文件DB 到错误通道,然后只要 DB 启动,文件就会得到处理

下面是我下载文件的代码

@Bean

  Properties configProperties(){

Properties config = new Properties();

config.setProperty("PreferredAuthentications", "password");

            return config;

        }

@Bean(name = PollerMetadata.DEFAULT_POLLER)

    public PollerMetadata pollRemoteDirectory() {

        PollerMetadata pollerMetadata = new PollerMetadata();

        pollerMetadata.setTrigger(
                new PeriodicTrigger(getSftpConfig().getPollingInterval(), TimeUnit.MINUTES));

        pollerMetadata.setMaxMessagesPerPoll(1000);

        return pollerMetadata;

        }

    @Bean

    SftpInboundFileSynchronizer syncRemoteFilesToLocalDirectory() throws MalformedURLException {

        SftpInboundFileSynchronizer fileSync = new VsSftpInboundFileSynchronizer(getSftpConfig().sftpSessionFactory());

        fileSync.setDeleteRemoteFiles(true);

        fileSync.setRemoteDirectory(getSftpConfig().getRemoteFilePath());

        CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new 
        CompositeFileListFilter<ChannelSftp.LsEntry>();

        compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter("*.xml"));

        fileSync.setFilter(compositeFileListFilter);

        return fileSync;

    }

@Bean

    @InboundChannelAdapter(value = "sftpChannel")

    public MessageSource setMessageSourceAndLocalDirectory() throws MalformedURLException {

        SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
                syncRemoteFilesToLocalDirectory());

        source.setLocalDirectory(new File(getSftpConfig().getArchiveFilePath()));

        source.setAutoCreateLocalDirectory(true);

        return source;
    }

@Bean

    @ServiceActivator(inputChannel = "sftpChannel")

    SftpFileHandler messageHandler() {

        return new SftpFileHandler();

    }

@Bean

    public static ResourceBundleMessageSource emailMessageSource() {

        final ResourceBundleMessageSource messageSource = new ResourceBundleMessageSource();

        messageSource.setBasename("mail/MailMessages");

        return messageSource;

    }

@Bean

    public static SpringTemplateEngine emailTemplateEngine() {

        final SpringTemplateEngine templateEngine = new SpringTemplateEngine();

        templateEngine.addTemplateResolver(textTemplateResolver());

        templateEngine.addTemplateResolver(htmlTemplateResolver());

        templateEngine.addTemplateResolver(stringTemplateResolver());

        templateEngine.setTemplateEngineMessageSource(emailMessageSource());

        return templateEngine;

    }

标签: spring-bootspring-integration

解决方案


轮询文件存储 FileSystemPersistentAcceptOnceFileListFilterSftpInboundFileSynchronizingMessageSource.

你需要像这样配置一个外部bean:

@Bean
public FileSystemPersistentAcceptOnceFileListFilter localFilefilter() {
    return new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "sftpFiles");
}

然后像这样注入它:

 source.setLocalFilter(localFilefilter());

当存储到数据库中的文件发生异常时,您只需要delete()使用无法存储的文件调用该过滤器的方法。见ResettableFileListFilter

/**
 * A {@link FileListFilter} that can be reset by removing a specific file from its
 * state.
 * @author Gary Russell
 * @since 4.1.7
 *
 */
public interface ResettableFileListFilter<F> extends FileListFilter<F> {

    /**
     * Remove the specified file from the filter so it will pass on the next attempt.
     * @param f the element to remove.
     * @return true if the file was removed as a result of this call.
     */
    boolean remove(F f);

}

我可能猜想你SftpFileHandler做了这么辛苦的工作,所以你可以adviceChain为它配置@ServiceActivator一个ExpressionEvaluatingRequestHandlerAdvice并配置它failureChannel来执行提到的delete()操作。

这样,将在下一个轮询周期再次从 SFTP 轮询未处理的文件。

实现目标的另一种方法是使用重试。为此,您可以RequestHandlerRetryAdvice出于同样的@ServiceActivator.adviceChain()原因使用 a。在这种情况下,相同的文件将一次又一次地尝试存储在 DB 中,而无需任何错误处理逻辑。

在参考手册中查看更多信息:https ://docs.spring.io/spring-integration/reference/html/#message-handler-advice-chain以及:https ://docs.spring.io/spring-integration /reference/html/#recovering-from-failures-2


推荐阅读