spring-integration - 在 Spring Integrration 中,DSL 如何指定订阅已发布频道?
问题描述
使用 Spring Integration DSL 构建器模式时,它通常会“自动”填充元素之间所需的通道。但是,有时它不会。
在高层次上,包装应用程序将元数据保存在数据库中,以便根据需要跨我们(可能)从未见过的平台动态创建和销毁流。因此,流不适合使用 @Bean 等静态符号进行实例化,而必须在运行时在 Spring 上下文中动态创建和销毁,以及注册/注销。
我在动态创建的主流中使用了一个已发布的消息通道,在动态创建的子流中使用了一个通道,但是我看不到如何从 subFlow 订阅 mainPublishChannel。
这让我将消息推送到频道中,但是没有订阅,什么也没有发生。
提前致谢。
一些先前的研究(不是详尽的清单:
https://github.com/spring-projects/spring-integration-flow
https://dzone.com/articles/spring-integration-building
https://xpadro.com/2014/05/spring-integration-4-0-a-complete-xml-free-example.html
Spring Integration - 如何调试“调度程序没有订阅者”?
日志片段
task-scheduler-1 2020-12-31 00:25:32,526 INFO o.s.i.g.GatewayProxyFactoryBean - started b653ca1c-038d-4567-bd4e-4c16ecc502a3.org.springframework.integration.config.ConsumerEndpointFactoryBean#3#gpfb
task-scheduler-1 2020-12-31 00:25:32,538 DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{... timestamp=1609395932538}]
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.d.BroadcastingDispatcher - No subscribers, default behavior is ignore
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{aaa=ee}], headers={aaa=ee, sequenceNumber=1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, sequenceSize=2, yyy=2020-12-24 11:15:30.915278, correlationId=0eef0e4e-768c-90db-fa7b-2d1767335b26, timestamp=1609395932538}]
代码片段:
String channelId=getId().toString()+'.'+"mainPublishChannel";
MessageChannel channel = MessageChannels.publishSubscribe(channelId, stepTaskExecutor).get();
final IntegrationFlowBuilder bldr = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(pollerFixedRate, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
.headerExpression("yyy", "payload[0].get(\"yyy\")"))
.split(tableSplitter)
.gateway(channel)
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules, channel)
)
.aggregate()
.handle(cleanupAdapter)
;
...
snip
...
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
Collection<RuleMetadata> rules, MessageChannel publishedChannel) {
// ??? How to subscribe this to publishedChannel??
recipientListSpec
.recipient(MessageChannels.publishSubscribe(this.getId().toString()+'.'+"mainReceiveChannel", stepTaskExecutor).get());
rules.forEach(
rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
recipientListSpec
.ignoreSendFailures(true)
.defaultOutputToParentFlow();
return recipientListSpec;
}
解决方案
publishedChannel 必须作为输入通道传递给子流
return flowDef
.channel(receiveChannel) // <---- This is the reference to the main publish channel in the child flow, which allows the builder to create the subscription
.log()
.handle(inboundAdapter)
... snip ...
;
推荐阅读
- python - 在python中用“k”元素附加“i”元素和“j”元素
- c++ - 为什么我不能使用指向字符数组的指针来填充向量?
- powershell - powershell:从两个 csv 文件中提取数据,输出带有匹配电子邮件的 csv 文件,并在下一列中添加与该电子邮件匹配的 ID 号
- api - 如何在 Binance API 的 P2P 中获取最新价格?
- java - 使用 Opengamma Strata 复制 Excel 价格函数
- xgboost - R中的xgboost特征重要性
- google-cloud-platform - 如何设置 Google Cloud 负载平衡器以允许使用 cname 记录指向域?
- testing - 如何从 Cloud Build“gradle test”配置无服务器 VPC 访问?
- typescript - 我可以让我的自定义类可以从 TypeScript 中的数字分配吗?
- javascript - 如何加快获取javascript数组的数据