首页 > 解决方案 > 将 DelegatingSessionFactory 与 RemoteFileTemplate.execute(SessionCallback) 一起使用

问题描述

我正在尝试声明多个 SFTP 会话,将它们包装在 DelegatingSessionFactory 中,然后在 cron 作业期间使用 SftpRemoteFileTemplate.execute(...) 。

在执行部分,代码非常简单,它已经用于单个会话,但我想将它扩展到多个可能的会话。

下面我扩展了我的单会话代码。我只是复制了方法以供参考。最后,我将展示我认为新方法的外观。

public class XSession extends SftpSession {

    @Scheduled(cron = "${sftp.scan.x.schedule}")
    void scan() {
        List<FileHistoryEntity> fileList = template.execute(this::processFiles);
        ...
    }
    
    private List<FileHistoryEntity> processFiles(Session<ChannelSftp.LsEntry> session) {
        List.of(session.list(this.remoteDir)).forEach(file -> doWhatever());
        ...
    }
}

但现在我有多个会话。所以我声明了以下类:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class DelegateSftpSessionHandler {

    private final SessionFactory<ChannelSftp.LsEntry> session1;
    private final SessionFactory<ChannelSftp.LsEntry> session2;
    private final SessionFactory<ChannelSftp.LsEntry> session3;
    private final SessionFactory<ChannelSftp.LsEntry> session4;
    private final SessionFactory<ChannelSftp.LsEntry> session5;
    
    @RequiredArgsConstructor
    public enum DelegateSessionConfig {
        SESSION_1("IN_REALITY_A_RELEVANT_NAME_1");
        SESSION_2("IN_REALITY_A_RELEVANT_NAME_2");
        SESSION_3("IN_REALITY_A_RELEVANT_NAME_3");
        SESSION_4("IN_REALITY_A_RELEVANT_NAME_4");
        SESSION_5("IN_REALITY_A_RELEVANT_NAME_5");

        public final String threadKey;
    }

    @Bean
    @Primary
    public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSessionFactory() {
        Map<Object, SessionFactory<ChannelSftp.LsEntry>> sessionMap = new HashMap<>();
        sessionMap.put(DelegateSessionConfig.SESSION_1.threadKey, session1);
        sessionMap.put(DelegateSessionConfig.SESSION_2.threadKey, session2);
        sessionMap.put(DelegateSessionConfig.SESSION_3.threadKey, session3);
        sessionMap.put(DelegateSessionConfig.SESSION_4.threadKey, session4);
        sessionMap.put(DelegateSessionConfig.SESSION_5.threadKey, session5);

        DefaultSessionFactoryLocator<ChannelSftp.LsEntry> sessionLocator = new DefaultSessionFactoryLocator<>(sessionMap);
        return new DelegatingSessionFactory<>(sessionLocator);
    }

    @Bean
    SftpRemoteFileTemplate ftpRemoteFileTemplate(DelegatingSessionFactory<ChannelSftp.LsEntry> dsf) {
        return new SftpRemoteFileTemplate(dsf);
    }
}

Ting 是,我不知道这是如何工作的,而且 spring sftp / fpt 文档也不清楚。该代码几乎没有记录。我只是猜测。我认为我必须执行以下操作:

public class XSession extends SftpSession {

    @Autowire
    DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSessionFactory;
    
    @Autowired
    SftpRemoteFileTemplate template;

    @Scheduled(cron = "${sftp.scan.x.schedule}") // x == SESSION_1
    @Async // for thread key
    void scan() {
        delegatingSessionFactory.setThreadKey(DelegateSessionConfig.SESSION_1.threadKey);
        
        // because thread key changes the session globally? So I don't need specify
        // which session this template is working with???
        List<FileHistoryEntity> fileList = template.execute(this::processFiles);
       
        ...
       
        delegatingSessionFactory.clearThreadKey();
    }
    
    private List<FileHistoryEntity> processFiles(Session<ChannelSftp.LsEntry> session) {
        List.of(session.list(this.remoteDir)).forEach(file -> doWhatever());
        ...
    }
}

我是根据下面的链接说的,github spring integration test

老实说,我几乎不明白发生了什么。但似乎设置线程键,全局更改会话。

我唯一的另一个想法是……按需创建 RemoteFileTemplate

public static SftpRemoteFileTemplate getTemplateFor(DelegatingSessionFactory<ChannelSftp.LsEntry> dsf, DelegateSessionConfig session) {
    return new SftpRemoteFileTemplate(dsf.getFactoryLocator().getSessionFactory(session.threadKey));
}

标签: springspring-integration

解决方案


它不会在全局范围内设置它。这就是ThreadLocal变量的工作方式:您在某个线程中设置了一个值,并且只有该线程可以看到它。如果您同时使用同一个对象,其他线程看不到该值,因为它不属于它们的线程状态。

不确定您关心的是什么,但扩展SftpSession自定义逻辑的模式是不正确的。您应该考虑改用 an SftpRemoteFileTemplate.execute(SessionCallback<F, T> callback),但无论如何都必须将线程键设置为 a DelegatingSessionFactory before并且在您要调用它的同一线程中execute()


推荐阅读