首页 > 解决方案 > 延迟入站适配器和控制总线

问题描述

我的集成流程代码是:

@Bean
    public IntegrationFlow messageFlow() {
        return IntegrationFlows.from(stompInboundChannelAdapter())
                .transform(inBoundStompMsgTransformer::transform)
                .headerFilter("stomp_subscription","content-length")
                .handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
                .get();
    }

我正在使用 Spring Boot。

日志清除状态{transformer}订阅者已添加到输入通道

2019-12-09 18:21:41.752  INFO 18248 --- [           main] o.s.i.s.i.StompInboundChannelAdapter     : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'

但是,我遇到了一个异常,并且丢失了队列中的前一/两条消息。它处理剩余的消息。

假设在我启动应用程序之前队列中有 10 条消息。在我启动应用程序后,我得到一个异常,即使日志说订阅者已经添加并且 bean 已经启动,发布异常,8/9 消息被处理。

例外是:org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage

很明显,上下文还没有完全准备好处理消息,因此是异常。但是日志消息具有误导性。

我的第一个问题:

  1. 那么添加订阅者并启动 bean 究竟意味着什么?这是否意味着一切都已设置,但上下文仍必须准备好处理消息?

为了克服这个问题,正如许多帖子中所建议的那样,我使用控制总线来启动适配器。代码是:

......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {

    @Autowired
    private MessageChannel controlBusChannel;

    @Override
    public void start() {
        System.out.println("Service starting...");
        controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
    }
.....

public class ApplicationLifeCycle implements SmartLifecycle认为它会很方便。

我的第二个问题是:

  1. 这是使用控制总线处理适配器启动/停止的正确/最佳方式吗?如果这不是正确的方法,那么请让我知道正确的方法。

谢谢,

马赫什

标签: spring-integrationspring-integration-dsl

解决方案


我认为这是您其他问题的延续:IntegrationFlow Amqp Channel Adapter is not working in handle()

你有这个:

@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
    StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
    adapter.setOutputChannel(stompInputChannel());
    adapter.setPayloadType(ByteString.class);
    return adapter;
}

你没有在这里显示。

问题是您随后在IntegrationFlow. 事实证明,StompInboundChannelAdapterbean 较早启动,然后IntegationFlow被处理并被.transform(inBoundStompMsgTransformer::transform)订阅以处理传入消息。

因此,如果您将其@Bean从其中删除,stompInboundChannelAdapter()它应该适合您。稍后我会看看为什么MessageProducerSupport更早开始,然后IntegrationFlow...


推荐阅读