首页 > 解决方案 > 在 Spring Integrration 中,DSL 如何指定订阅已发布频道?

问题描述

使用 Spring Integration DSL 构建器模式时,它通常会“自动”填充元素之间所需的通道。但是,有时它不会。

在高层次上,包装应用程序将元数据保存在数据库中,以便根据需要跨我们(可能)从未见过的平台动态创建和销毁流。因此,流不适合使用 @Bean 等静态符号进行实例化,而必须在运行时在 Spring 上下文中动态创建和销毁,以及注册/注销。

我在动态创建的主流中使用了一个已发布的消息通道,在动态创建的子流中使用了一个通道,但是我看不到如何从 subFlow 订阅 mainPublishChannel。

这让我将消息推送到频道中,但是没有订阅,什么也没有发生。

提前致谢。


一些先前的研究(不是详尽的清单:

https://github.com/spring-projects/spring-integration-flow

https://dzone.com/articles/spring-integration-building

https://xpadro.com/2014/05/spring-integration-4-0-a-complete-xml-free-example.html

Spring集成网关“调度程序没有订阅者”

Spring Integration - 如何调试“调度程序没有订阅者”?


日志片段

task-scheduler-1 2020-12-31 00:25:32,526 INFO  o.s.i.g.GatewayProxyFactoryBean - started b653ca1c-038d-4567-bd4e-4c16ecc502a3.org.springframework.integration.config.ConsumerEndpointFactoryBean#3#gpfb
task-scheduler-1 2020-12-31 00:25:32,538 DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{... timestamp=1609395932538}]
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.d.BroadcastingDispatcher - No subscribers, default behavior is ignore
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{aaa=ee}], headers={aaa=ee, sequenceNumber=1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, sequenceSize=2, yyy=2020-12-24 11:15:30.915278, correlationId=0eef0e4e-768c-90db-fa7b-2d1767335b26, timestamp=1609395932538}]

代码片段:

    String channelId=getId().toString()+'.'+"mainPublishChannel";
    MessageChannel channel = MessageChannels.publishSubscribe(channelId, stepTaskExecutor).get();

    final IntegrationFlowBuilder bldr = IntegrationFlows
        .from(setupAdapter,
                c -> c.poller(Pollers.fixedRate(pollerFixedRate, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
        .enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
                .headerExpression("yyy", "payload[0].get(\"yyy\")"))
        .split(tableSplitter)
        .gateway(channel)   
        .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules, channel)
                    ) 
        .aggregate()
        .handle(cleanupAdapter)
        ;
...
snip
...
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
        Collection<RuleMetadata> rules, MessageChannel publishedChannel) {

    // ??? How to subscribe this to publishedChannel??
    recipientListSpec
        .recipient(MessageChannels.publishSubscribe(this.getId().toString()+'.'+"mainReceiveChannel", stepTaskExecutor).get());
    
    rules.forEach(
            rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
            
    recipientListSpec
        .ignoreSendFailures(true)
        .defaultOutputToParentFlow();
    
    return recipientListSpec;
}

标签: spring-integrationspring-integration-dsl

解决方案


publishedChannel 必须作为输入通道传递给子流

        return flowDef
            .channel(receiveChannel) //  <---- This is the reference to the main publish channel in the child flow, which allows the builder to create the subscription
            .log()
            .handle(inboundAdapter)
... snip ...

            ;

推荐阅读