spring-integration-dsl - Spring Integration DSL: lambda 返回消息在句柄方法中,例如使用 DelegatingSessionFactory?
问题描述
动机:我需要在路由到 Sftp 出站网关之前为 DelegatingSessionFactory 设置 threadKey,然后再取消设置 threadKey。
根据租户的不同,我需要使用不同的 Sftp 用户帐户。用户帐户是我的 application.yml 中的配置问题,我不想为每个新租户编写单独的路由。
public IntegrationFlow aDynamicSftpFlow() {
f -> f
.handle(tenantSessionDefine()) // how can I use a lambda instead?
.handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
.handle(...) // undefine sftp session
}
设置 threadKey 需要一个Message<?>
,而不仅仅是有效负载和标头。所以我使用一个bean,因为它需要一条消息:
public class TenantSessionDefine {
private DelegatingSessionFactory delegatingSessionFactory;
public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
this.delegatingSessionFactory = delegatingSessionFactory;
}
public Message<?> defineSession(Message<?> message) {
return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
.get("tenantId", String.class));
// used by SessionFactoryLocator
}
}
我想把它写成一个 lambda,如
.handle(message -> delegatingSessionFactory.setThreadKey(message,
message.getPayload().getTenant())
但这并不容易。可以使用的 lambdahandle()
结束Message<T>
流程,因为它是一个 void 函数(MessageHandler
功能接口)。另一个 lambda 是 GenericHandler,它不会结束流,但它需要有效负载和标头,而不是消息。
这只是一个例子,时不时地我希望我可以handle()
在不结束流程的情况下使用 lambda 中的消息。我怎样才能做到这一点?
更新
这DelegatingSessionFactory
不是一个特别适合的例子。由于设置和清除线程密钥应该发生在 sftp 调用之前和之后,因此建议比在调用之前和之后定义处理程序更合适。
解决方案
知道了。javadochandle()
说
handle(Class, GenericHandler)
如果您需要访问整个消息,请使用。
类参数必须是Message.class
:
.handle(Message.class,
(message, headers) -> sftpSessionFactory
.setThreadKey(message, headers.get("tenantId")))
推荐阅读
- bash - 运行多个命令并在其中一个失败时让脚本失败
- android - 带有圆帽的笔画在 EditText 中显示一半
- php - 如何在本地环境中向手机发送短信?
- python-3.x - 无法使用 TensorFlow 数据集拆分疟疾数据集
- javascript - 使用 tsconfig.json 从 node_modules 导入外部类型定义
- android - 这个广告倒计时方法是如何通过 jadx 从 apk 工作的?
- api - RAML 文件中的命名空间
- python - Python 分类器
- python-3.x - pyspark 是否会更改优化指令的顺序?
- amazon-web-services - Spring Boot Tomcat 访问 Cloudwatch Docker 日志