java - 为什么在默认 Spring Cloud Stream 配置中更改 Spring Integration 消息方法处理行为
问题描述
我有一个已经使用 Spring Integration(最新 5.1.6)的应用程序。并配置了以下流程:
@Configuration
public class SomeConfigClass {
...
@MessagingGateway(name = "someGateway")
interface Gateway {
@Gateway(requestChannel = "inboundChannel")
@Payload("T(java.time.ZonedDateTime).now()")
void replicate();
}
@Bean
public DirectChannel inboundChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows.from(inboundChannel())
.handle(someHandler())
.channel(OUT)
.get();
}
@Bean
public SomeHandler someHandler() {
return new SomeHandler();
}
}
和
public class SomeHandler implements GenericHandler<Object> {
@Override
public Message<List<String>> handle(final Object payload,
final MessageHeaders headers) {
...
return MessageBuilder
.withPayload(someList)
.copyHeaders(headers)
.setHeader("custom", customHeader)
.build();
}
}
一切正常。
如果我尝试integrationArgumentResolverMessageConverter
在初始化的上下文中找到 bean,我会看到下一个转换器:
MappingJackson2MessageConverter
ByteArrayMessageConverter
ObjectStringMessageConverter
GenericMessageConverter
之后,我将 Spring Cloud Stream 2.1.2 依赖项和 Kinesis Binder 1.2.0 添加到我的 pom 依赖项中。默认配置绑定。
应用程序启动,但是当我尝试处理现有流程时,它失败了,例如:
EL1004E:方法调用:在 org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:225) 的类型 packageSomeHandler 上找不到方法句柄(java.time.ZonedDateTime,org.springframework.messaging.MessageHeaders) org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134) at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:54) at org.springframework.expression.spel .ast.MethodReference$MethodValueRef.getValue(MethodReference.java:390) 在 org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:90) 在 org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue (SpelNodeImpl.java:114) 在 org.springframework.expression.spel.standard。SpelExpression.getValue(SpelExpression.java:365) at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:172) at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:160) at org .springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:664) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:655) at org.springframework.integration.handler.support .MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:491) 在 org.springframework.integration.handler.support.MessagingMethodInvokerHelper。org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:106) 的流程(MessagingMethodInvokerHelper.java:362) .integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java :115) 在 org.springframework 的 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)。integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java: 453)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)在 org.springframework.messaging.core.GenericMessagingTemplate .doSend(GenericMessagingTemplate.java:166) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate。在 org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) 在 org.springframework 发送(AbstractMessageSendingTemplate.java:109) .integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:413) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:533) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java :473) 在 org.springframework.aop.framework.ReflectiveMethodInvocation 的 org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:463)。在 com.sun.proxy.$Proxy444.replicate 的 org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) 继续(ReflectiveMethodInvocation.java:186)(未知来源)
当我尝试integrationArgumentResolverMessageConverter
从初始化的上下文中获取相同的 bean 时,我看到了下一条链:
ApplicationJsonMessageMarshallingConverter
TupleJsonMessageConverter
ByteArrayMessageConverter
ObjectStringMessageConverter
JavaSerializationMessageConverter
KryoMessageConverter
JsonUnmarshallingConverter
而且没有GenericMessageConverter
。据我了解,由于缺少此转换器,因此无法转换(如果我错了,请纠正我)。
为什么我只添加 Spring Cloud Stream 默认配置时行为会有所不同?或者如何为特定流程定制使用转换器链?或者如何为不同的集成流程保持消息对话行为?
更新:所以当我调查spring cloud stream时,不仅重新定义了默认集成MessageConverter
s,而且还重新定义了默认HandlerMethodArgumentResolver
s,它用于将方法参数与消息映射..
在添加 Spring Cloud Stream 之前:
HeaderMethodArgumentResolver
HeadersMethodArgumentResolver
MessageMethodArgumentResolver
PayloadExpressionArgumentResolver
NullAwarePayloadArgumentResolver
PayloadsArgumentResolver
MapArgumentResolver
PayloadArgumentResolver
添加 Spring Cloud Stream 后:
SmartPayloadArgumentResolver
SmartMessageMethodArgumentResolver
HeaderMethodArgumentResolver
HeadersMethodArgumentResolver
PayloadExpressionArgumentResolver
NullAwarePayloadArgumentResolver
PayloadExpressionArgumentResolver
PayloadsArgumentResolver
MapArgumentResolver
有两个已弃用SmartPayloadArgumentResolver
和SmartMessageMethodArgumentResolver
修复,用于从byte[]
有效负载转换为Object
. 但是我不明白为什么有两个PayloadExpressionArgumentResolver
?
主要问题:为什么 Spring Cloud Stream 默认应用程序上下文会影响 Spring Integration 默认应用程序上下文,我之前认为 Stream 的解析器/转换器仅与与流目标通道链接的消息端点相关......
解决方案
我不确定为什么 Stream 会丢弃该转换器(可能是一个错误,可能会在那里打开一个 GitHub 问题),但我相信您可以@StreamMessageConverter
@Bean
按照流文档中的讨论将其添加回来。
推荐阅读
- python - 在 setUp() 中创建异步事件是 aiounittest 的一个陷阱
- html - 防止元素改变附近元素的位置
- haskell - 如何使用 Haskell/Aeson 中的类型函数解析多态值?
- vb.net - 如何将命令行开关添加到 VB.NET WinForms 应用程序(从 CMD 运行时)?
- node.js - 如何从管理员 ui 中删除 Keystonejs 5 github repo url?
- android - 向 Firebase 中的所有用户推送通知
- python - 为什么这个 ML 模型的准确率为零?
- javascript - 为什么我的 React 无法读取内部 api (flask) 数据?
- c++ - 生成随机二进制数序列的问题
- airflow - 基于操作员类型的气流并行度