spring-integration - 使用 spring 集成 dsl 提高消息处理的性能
问题描述
我正在使用 spring 集成 dsl 从 Kafka 接收消息,然后在 Oracle、Couchbase 中解析、丰富和持久化,并将其发布到另一个 Kafka 主题以供下线渠道使用。
持久化和发布需要在事务中,以便所有数据源/数据存储同步。如果 Couchbase、Oracle 或 Publish Kafka 主题不可用,则回滚事务。
同时,我不希望在处理消息时出现延迟,因为这是对业务用户的实时更新。
return IntegrationFlows
// .from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //Get Message
// from MQ
.from(org.springframework.integration.jms.dsl.Jms
.messageDrivenChannelAdapter(org.springframework.integration.jms.dsl.Jms
.container(this.acarsMqConnectionFactory, this.acarsQueue)
.transactionManager(transactionManager(this.acarsMqConnectionFactory)).get()))
.wireTap(ACARS_WIRE_TAP_CHNL) // Log the raw messaged
.transform(agmTransformer, "parseXMLMessage") // Parse the AGM xml message
.handle(acarsProcessor, "pushAcarsRawData") // push raw acars data
.wireTap(ACARS_WIRE_TAP_CHNL_DYNAMODB) // Log the raw messaged
.transform(agmTransformer, "populateSmi") // Populate SMI
// .transform(agmTransformer, "populateSmi") //Populate SMI
.filter(acarsFilter, "filterMessageOnSmi") // Filter on SMI
.transform(agmTransformer, "populateImi") // Populate IMI
.filter(acarsFilter, "filterMessageOnSmiImi") // Filter on IMI
.transform(acarsProcessor, "processEvent") // Parse
.publishSubscribeChannel(
pubSub -> pubSub
.subscribe(flow -> flow.bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
.enrichHeaders(
h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNbr")) // Add flight number as key
.transform("payload.message") // publish the transformed message
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(acarsKafkaTopic))) // publish to kafka
.subscribe(flow -> flow.channel(UPDATE_DATA_STORE_CHNL))) // send to a different channel to update couchbase
.get();
您能否建议在集成流程中可以做些什么来提高处理性能。
解决方案
您应该分析您的应用程序以查看瓶颈在哪里。
通常,为了提高吞吐量,您需要增加concurrency
侦听器容器以并行处理消息。使用 Kafka,您至少需要与并发线程一样多的分区。
如果瓶颈在您的下游组件之一中,增加并发性可能无济于事;因此需要进行剖析。
推荐阅读
- c# - 是否可以在不使用 XAML 的情况下更新相机位置和方向?
- spring-boot - 使用 Docker Desktop 的 Spring Boot 应用程序日志
- kubernetes - 如何在 Kubernetes 集群上安装 Ceph
- python - 按名称从上下文中获取值
- php - 根据数据库 Laravel 中的 user_id 获取城市值
- laravel - 注销时如何获取 DATETIME 数据?
- java - 设置日期时无法保存实体
- javascript - ClientIDMode="Static" 中的 Telerik RadDatePicker 问题
- django-rest-framework - 无法为多个数据生成发布请求
- ruby-on-rails - 如何获取数组中的下一个和上一个元素,Ruby