首页 > 解决方案 > 为什么我的处理程序方法在定义为 lambda 时没有被触发?

问题描述

我正在IntegrationFlow以这种方式使用 DSL 语法定义从 SFTP 到 S3 的流:

return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
                        .remoteDirectory("remoteDirectory"),
                e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
                .transform(new StreamTransformer())
                .handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
                .get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
        S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
        s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
        s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
        return s3MessageHandler;
    }

它按预期工作:文件很好地上传到我的 S3 存储桶。但是,我想避免SPEL语法,并将消息中的标头注入s3uploadMessageHandler方法,这样我可以使用简单的方法在方法ValueExpression中设置。为此,我改变了keyExpressions3UploadMessageHandler

handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3

handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3

但是现在这个处理程序似乎不再被触发了。日志中没有错误,我从日志中知道 SFTP 轮询仍在工作。

我试图找到这背后的原因,我发现当在 中输入句柄方法时IntegrationFlowdefinition.javamessageHandler类类型不同:它是一个S3MessageHandler没有 lambda 的MyCallingClass$lambda调用时,一个使用 lambda 表达式调用的时候。

我错过了什么让我的场景工作?

标签: javalambdaspring-integrationspring-integration-dslspring-integration-aws

解决方案


有两种处理消息的方法。一种是通过MessageHandler实现——这是最有效的方法,并且在通道适配器实现的框架中完成,就像那样S3MessageHandler。另一种方法是 POJO 方法调用 - 当您不需要担心任何框架接口时,这是最用户友好的方法。

因此,当您像这样使用它时,.handle(s3UploadMessageHandler(...))您指的是 aMessageHandler并且框架知道必须为MessageHandler它注册一个 bean,因为您s3UploadMessageHandler()的不是@Bean.

当您将其用作 lambda 时,框架将其视为 POJO 方法调用,并且MethodInvokingMessageHandlerS3MessageHandler.

无论如何,即使您将您的方法更改s3UploadMessageHandler()为一种@Bean方法,它也不会起作用,因为您不让框架调用S3MessageHandler.handleMessage(). 您在这里所做的只是private在运行时调用该方法以针对每个请求消息创建一个S3MessageHandler实例:在其中MethodInvokingMessageHandler调用您的 lambda,handleMessage()仅此而已 - S3 不会发生任何事情。

在这里ValueExpression无法为您提供帮助,因为您需要针对每条请求消息评估目标文件。因此,您需要一个运行时表达式。确实没有错new SpelExpressionParser().parseExpression()。仅仅因为我们别无选择,只能有一个无状态S3MessageHandler,并且不要在运行时在每个请求上重新创建它,就像你试图用那个可疑的 lambda 和ValueExpression.


推荐阅读