首页 > 解决方案 > 回滚通道中的事务

问题描述

我有一个 Spring Integration 流程,它轮询文件夹中的文件,然后尝试将文件的内容发送到 Kafka 主题。轮询器被配置为使用事务管理器 ( PseudoTransactionManager()),因此如果出现问题,文件将移动到“错误”文件夹。

这是流程的样子:

  @Bean
  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionParser parser = new SpelExpressionParser();
    ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
        new ExpressionEvaluatingTransactionSynchronizationProcessor();
    syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
    syncProcessor.setAfterCommitExpression(parser
        .parseExpression("payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " +
            " + T(java.io.File).separator + payload.name))"));
    syncProcessor.setAfterRollbackExpression(
        parser.parseExpression("payload.renameTo(new java.io.File(@inboundFailedDirectory.path " +
            " + T(java.io.File).separator + payload.name))"));
    return new DefaultTransactionSynchronizationFactory(syncProcessor);
  }


public IntegrationFlow inboundFileIntegration(MessageSource<File> fileReadingMessageSource) {

    c -> c.id("inboundFileAdapter")
            .poller(Pollers.fixedDelay(period)   
            .taskExecutor(taskExecutor) 
            .maxMessagesPerPoll(maxMessagesPerPoll)
              .transactionSynchronizationFactory(transactionSynchronizationFactory()) 
                .transactional(transactionManager())))  
        // END POLLER CONFIGURATION
        .transform(fileToObjectTransformer)         
        .channel(INBOUND_BBS_CHANNEL)
        .get();
}



  @Bean
  public IntegrationFlow toKafka(KafkaTemplate<?, ?> kafkaTemplate) {
    return IntegrationFlows.from(INBOUND_BBS_CHANNEL)
          .handle(Kafka.outboundChannelAdapter(kafkaTemplate)

              .topic(this.properties.getEntitlement().getTopic())
            .sync(true)
            .messageKey(this.properties.getEntitlement().getMessageKey()))
        .get();
  }

流程在“快乐”场景中完美运行。但是,例如,如果我杀死 Kafka 集群,我希望将错误传播回来,transactionalManager并将文件移动到“错误”文件夹中(并使用 记录问题wiretap)。相反,当发生与 Kafka 相关的错误时,我得到:

Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException:

接着:

Caused by: java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.

放置在初始文件夹中的文件不会移动。我认为问题与“kafka 通道”是单向的事实有关,因此异常不会传播回 txManager 所在的初始流。如何让 txManager “回滚”文件?

- - 更新 - -

完整的堆栈跟踪

2018-10-04 17:46:32.332 ERROR 7300 --- [ taskExecutor-4] o.s.k.s.LoggingProducerListener          : Exception thrown when sending a message with key='bbs.key' and payload='{"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applic...' to topic bbs.topic:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.

2018-10-04 17:46:32.349  WARN 7300 --- [ad | producer-1] o.a.k.c.NetworkClient                    : [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2018-10-04 17:46:32.348  INFO 7300 --- [ taskExecutor-4] ---- DONE ----                           : GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": []
2018-10-04 17:46:32.364 ERROR 7300 --- [ taskExecutor-4] o.s.i.h.LoggingHandler                   : All attempts to deliver Message to MessageHandlers failed.; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms., failedMessage=GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "45T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}]}, headers={errorChannel=error222, file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=b28bb489-b62e-6861-5c1b-c4665fd72e62, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991063}]. Multiple causes:
    error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
    error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@180d04d9] (inboundFileIntegration.org.springframework.integration.config.ConsumerEndpointFactoryBean#3)]; nested exception is java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.
See below for the stacktrace of the first cause.org.springframework.messaging.MessageHandlingException: error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms., failedMessage=GenericMessage [payload={"messageKey": ]}, headers={errorChannel=error222, file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=b28bb489-b62e-6861-5c1b-c4665fd72e62, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991063}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:144)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:227)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:290)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    at com.sun.proxy.$Proxy93.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:391)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:206)
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:133)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:290)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    ... 73 more
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
    at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:370)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:827)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:363)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:355)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:206)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:257)
    ... 74 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@180d04d9] (inboundFileIntegration.org.springframework.integration.config.ConsumerEndpointFactoryBean#3)]; nested exception is java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String., failedMessage=GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "45T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}]}, headers={file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=3ebb9873-e1c2-8ec3-b541-98106af7bf53, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991043}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:144)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:227)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:290)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    at com.sun.proxy.$Proxy93.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:391)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:78)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    ... 73 more

标签: spring-integrationspring-integration-dsl

解决方案


推荐阅读