首页 > 解决方案 > Spring Integration DSL: lambda 返回消息在句柄方法中,例如使用 DelegatingSessionFactory?

问题描述

动机:我需要在路由到 Sftp 出站网关之前为 DelegatingSessionFactory 设置 threadKey,然后再取消设置 threadKey。

根据租户的不同,我需要使用不同的 Sftp 用户帐户。用户帐户是我的 application.yml 中的配置问题,我不想为每个新租户编写单独的路由。

public IntegrationFlow aDynamicSftpFlow() {
    f -> f
        .handle(tenantSessionDefine()) // how can I use a lambda instead?
        .handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
        .handle(...) // undefine sftp session
}

设置 threadKey 需要一个Message<?>,而不仅仅是有效负载和标头。所以我使用一个bean,因为它需要一条消息:

public class TenantSessionDefine {


    private DelegatingSessionFactory delegatingSessionFactory;

    public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
        this.delegatingSessionFactory = delegatingSessionFactory;
    }

    public Message<?> defineSession(Message<?> message) {
        return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
            .get("tenantId", String.class)); 
// used by SessionFactoryLocator
    }
}

我想把它写成一个 lambda,如

.handle(message -> delegatingSessionFactory.setThreadKey(message, 
    message.getPayload().getTenant())

但这并不容易。可以使用的 lambdahandle()结束Message<T>流程,因为它是一个 void 函数(MessageHandler功能接口)。另一个 lambda 是 GenericHandler,它不会结束流,但它需要有效负载和标头,而不是消息。

这只是一个例子,时不时地我希望我可以handle()在不结束流程的情况下使用 lambda 中的消息。我怎样才能做到这一点?

更新

DelegatingSessionFactory不是一个特别适合的例子。由于设置和清除线程密钥应该发生在 sftp 调用之前和之后,因此建议比在调用之前和之后定义处理程序更合适。

标签: spring-integration-dsl

解决方案


知道了。javadochandle()

handle(Class, GenericHandler)如果您需要访问整个消息,请使用。

类参数必须是Message.class

.handle(Message.class, 
    (message, headers) -> sftpSessionFactory
        .setThreadKey(message, headers.get("tenantId")))

推荐阅读