spring-integration - Spring Integration 流程调用 REST 服务
问题描述
我在我的项目中定义了以下集成流程
///
public IntegrationFlow acarsEventFlow() {
return IntegrationFlows
//.from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //Get Message from MQ
.from(org.springframework.integration.jms.dsl.Jms.messageDrivenChannelAdapter(
org.springframework.integration.jms.dsl.Jms.container(this.acarsMqConnectionFactory, this.acarsQueue)
.transactionManager(transactionManager(this.acarsMqConnectionFactory))
.get()))
.wireTap(ACARS_WIRE_TAP_CHNL)
.transform(agmTransformer, "parseXMLMessage") //
.handle(acarsProcessor, "pushRawMessage") // (1)Call web service to push the message payload and if it fails then don't commit the transaction and rollback the message
.transform(agmTransformer, "populateSmi")
.filter(acarsFilter,"filterMessageOnSmi") //
.transform(agmTransformer, "populateImi") //
.filter(acarsFilter,"filterMessageOnSmiImi") //
.transform(acarsProcessor,"processEvent") //
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
.enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNbr")) //Add flight number as key
.transform("payload.message") // publish the transformed message
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(acarsKafkaTopic))) //publish to kafka
.subscribe(flow -> flow
.channel(UPDATE_DATA_STORE_CHNL)))
.get();
}
///
我从 MQ 获取消息,启动事务管理器以确保消息回滚,除非它被处理。现在在其中一个句柄方法# pushRawMessage() [请参考上面代码片段中的注释(1)Call web service to push message payload] 我需要调用一个webservice。目前我只是从处理程序内部调用 web 服务 - pushRawMessage()。引入消息传递网关来调用第 3 方 Web 服务是个好主意吗?如果我们引入一个消息网关,那么我们如何确保在 web 服务关闭时回滚原始消息?
解决方案
可以像现在一样拥有它。此外,最好使用 a.gateway()
为该 Web 服务进程执行一些子流程。只要一切都在同一个线程中完成,当您只使用直接通道时,一切都将参与同一个事务。因此,该子流中的任何错误都会导致事务回滚。
只要您使用gateway()
. 无论如何,它将在当前线程中等待回复或错误。因此,事务将再次回滚。
推荐阅读
- html - 将两个旋转的 div 正确嵌入并定位在矩形的角上?
- javascript - 单击锚点后如何提醒主题标签
- c# - 如何编写 C# 代码以在 AGauge 中添加新范围或编辑?
- android - 如何区分自定义视图中的点击?
- java - Spring MVC 中的模型对象
- sql - Postgresql:一列表的主键
- php - 从php中的数组中找到最近的下一个日期
- python - 试图通过 gspread 在 Google 表格中找到最高的空单元格
- sql - 为什么我的 SELECT 语句在 SELECT MAX(ID) 时返回 2 行?
- machine-learning - 是否可以逐步训练 sklearn 模型(例如 SVM)?