spring-boot - Spring集成java DSL Dispatcher没有订阅者
问题描述
即使将订阅者添加到我的频道(即 orderDeliveredChannel)后,我仍然会收到以下异常。
我正在将 XML 配置转换为 Spring Integration java DSL。
以下是我在运行应用程序时遇到的异常。
2019-11-10 13:47:14.520 INFO 24598 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
2019-11-10 13:47:14.522 ERROR 24598 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.orderDeliveredChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}], failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:851)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:498)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:471)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:396)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:380)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 48 more
这是我的spring集成配置
package study.pattern.integration.lab9.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageHandler;
import lombok.extern.slf4j.Slf4j;
import study.pattern.integration.lab9.domain.ItemType;
import study.pattern.integration.lab9.domain.Order;
import study.pattern.integration.lab9.domain.OrderItem;
import study.pattern.integration.lab9.service.OrderDelivery;
import study.pattern.integration.lab9.service.OrderItemsProcessor;
@EnableIntegration
@Configuration
@IntegrationComponentScan
@Slf4j
public class IntegrationConfiguration {
@Autowired
OrderItemsProcessor processor;
@Autowired
OrderDelivery orderDelivery;
@Bean
@Qualifier("orderChannel")
public DirectChannel orderChannel() {
return MessageChannels.direct().get();
}
@Bean
@Qualifier("orderItemsChannel")
public DirectChannel orderItemsChannel() {
return MessageChannels.direct().get();
}
/**
*
*
* QueueChannel: Implements PollableChannel.
* There’s one endpoint connected to the channel, no subscribers. This communication is asynchronous;
* the receiver will retrieve the message through a different thread. How it works:
The producer sends the message to the channel.
The channel queues the message.
The consumer actively retrieves the message (active receiver).
* @return
*/
@Bean
@Qualifier("musicItemsChannel")
public QueueChannel musicItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("softwareItemsChannel")
public QueueChannel softwareItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("booksItemChannel")
public QueueChannel booksItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("orderItemsProcessed")
public DirectChannel orderItemsProcessedChannel() {
return MessageChannels.direct().get();
}
@Bean
@Qualifier("orderDelivered")
public DirectChannel orderDeliveredChannel() {
return MessageChannels.direct().get();
}
@Bean
@ServiceActivator(inputChannel = "myLogChannel")
public MessageHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.INFO.name());
loggingHandler.setLoggerName("logging");
return loggingHandler;
}
@Bean
IntegrationFlow processOrderFlow() {
return IntegrationFlows
.from(orderChannel())
.split(Order.class , a -> a.getOrderItems())
.channel(orderItemsChannel())
.wireTap(f -> f.handle(logger()))
.route(OrderItem.class,
o -> o.getType().name(),
type -> type.channelMapping(ItemType.BOOK.name(),booksItemsChannel())
.channelMapping(ItemType.MUSIC_CD.name(), musicItemsChannel())
.channelMapping(ItemType.SOFTWARE.name(), softwareItemsChannel())
)
.get();
}
@Bean
IntegrationFlow processBooksItemChannel() {
return IntegrationFlows.from(booksItemsChannel())
.handle(processor,"processBooksOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
// .wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow processMusicItemChannel() {
return IntegrationFlows.from(musicItemsChannel())
.handle(processor,"processMusicOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
// .wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow processSoftwareItemChannel() {
return IntegrationFlows.from(softwareItemsChannel())
.handle(processor, "processSoftware", spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
// .wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow aggreateAllProcessedOrderItems() {
return IntegrationFlows.from(orderItemsProcessedChannel())
.aggregate(spec -> spec.processor(orderDelivery, "delivery"))
.channel(orderDeliveredChannel())
.handle(m -> log.info("The Payload data {} ",m.getPayload()))
.log()
.get();
}
}
我在完成 XML 到 java DSL 配置的最后一步。
有人可以帮助如何解决这个问题。
解决方案
你的问题在这里:
.handle(m -> log.info("The Payload data {} ",m.getPayload()))
.log()
log
当不再有通道时,您就不能使用拦截器。我这么说是因为这handle()
是一个单向组件,没有任何内容可以作为回复发送,因此不会为该log
拦截创建输出通道。
目前尚不清楚您拥有哪个 Spring Integration 版本,但目前我们可以防止这种错误配置:
throw new BeanCreationException("The 'currentComponent' (" + currComponent +
") is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. " +
"This is the end of the integration flow.");
因此,您的错误将在解析阶段立即被拒绝!
推荐阅读
- csv - 从 .csv 写入不同的表
- tensorflow - tf.keras.layers.LSTM 参数的含义
- python - 如何同时启动 Miniconda 和激活环境?
- symfony4 - Symfony 4.3:覆盖来自其他 Bundle 的命令
- mongodb - MongoDB 的 $group 的算法复杂度
- fabricjs - 如何识别组内存在的图像上的点击事件?我想在呈现 json 时识别点击事件
- c++ - 如何在带有背景图像的小部件上制作隐形按钮?
- python - 向后移动时如何在正则表达式模式中的空白处停止
- listview - 增加 ListView 宽度以适应内容并删除水平滚动条
- mysql - 每次我选择 React Native 日期选择器时,我如何才能获得选定的日期、月份、年份和小时、分钟?