首页 > 解决方案 > Spring IntegrationFlow CompositeFileListFilter 不工作

问题描述

我有两个过滤器 regexFilter 和 lastModified。

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .regexFilter(config.getRegexFilter())
            .filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory())
            , e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {

    })))

通过谷歌搜索,我知道我必须将 CompositeFileListFilter 用于正则表达式,所以将我的代码更改为

.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))

它已编译但在运行时会引发错误并且通道弯曲并且同样的错误会发生

.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.

public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
        CompositeFileListFilter filters = new CompositeFileListFilter();
            filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
        return filters;
    }

我只想根据文件名进行过滤。同一个远程文件夹有 2 个流程,并且都使用相同的 cron 进行轮询,但应该选择它们的相关文件。

编辑 添加最后一个 LastModifiedLsEntryFileListFilter。它工作正常,但应要求添加。

public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {

private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;

private volatile long age = DEFAULT_AGE;

private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();


public long getAge() {
    return this.age;
}

public void setAge(long age) {
    setAge(age, TimeUnit.SECONDS);
}

public void setAge(long age, TimeUnit unit) {
    this.age = unit.toSeconds(age);
}

@Override
public List<LsEntry> filterFiles(LsEntry[] files) {

    List<LsEntry> list = new ArrayList<LsEntry>();

    long now = System.currentTimeMillis() / 1000;

    for (LsEntry file : files) {

        if (file.getAttrs()
                .isDir()) {
            continue;
        }
        String fileName = file.getFilename();
        Long currentSize = file.getAttrs().getSize();
        Long oldSize = sizeMap.get(fileName);

        if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
            // putting size in map, will verify in next iteration of scheduler
            sizeMap.put(fileName, currentSize);
            log.info("[{}] old size [{}]  increased to [{}]...", file.getFilename(), oldSize, currentSize);
            continue;
        }

        int lastModifiedTime = file.getAttrs()
            .getMTime();

        if (lastModifiedTime + this.age <= now ) {
            list.add(file);
            sizeMap.remove(fileName);
        } else {
            log.info("File [{}] is still being uploaded...", file.getFilename());
        }
    }
    return list;
}

}

PS:当我测试正则表达式的过滤器时,我删除了 LastModifiedLsEntryFileListFilter 只是为了简单。所以我的最终流程是

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
            //.filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory()),
            e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                try {

                    this.destroy(String.valueOf(config.getId()));


    configurationService.removeConfigurationChannelById(config.getId());

//                // logging here
                } catch (Exception ex1) {
            }
            }))).publishSubscribeChannel(s -> s
            .subscribe(f -> {

                f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autoCreateDirectory(true)
                        .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

            })
            .subscribe(f -> {
                if (doArchive) {
                    f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                } else {
                    f.handle(m -> {
                    });
                }

            })
            .subscribe(f -> f
            .handle(m -> {

                // I am handling exception here
            })
            ))
            .get();

这里有例外

2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel 

'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1

编辑 将正则表达式传递给 LastModifiedLsEntryFileListFilter 并在那里处理对我有用。当我在 CompositeFileListFilter 中使用任何其他 RegexFilter 时,它会引发错误。

.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))

标签: spring-integrationspring-integration-dslspring-integration-sftp

解决方案


请展示你的最终流程。我没有看到您LastModifiedLsEntryFileListFilter在...中使用CompositeFileListFilter...您绝对不能将regexFilter()andfilter()一起使用-最后一个获胜。为避免混淆,我们建议使用 a并用orfilter()组合所有这些。CompositeFileListFilterChainFileListFilter

另外,请问您提到的错误是什么。


推荐阅读