首页 > 解决方案 > 使用 spring 集成 dsl 提高消息处理的性能

问题描述

我正在使用 spring 集成 dsl 从 Kafka 接收消息,然后在 Oracle、Couchbase 中解析、丰富和持久化,并将其发布到另一个 Kafka 主题以供下线渠道使用。

持久化和发布需要在事务中,以便所有数据源/数据存储同步。如果 Couchbase、Oracle 或 Publish Kafka 主题不可用,则回滚事务。

同时,我不希望在处理消息时出现延迟,因为这是对业务用户的实时更新。

        return IntegrationFlows
                // .from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //Get Message
                // from MQ
                .from(org.springframework.integration.jms.dsl.Jms
                        .messageDrivenChannelAdapter(org.springframework.integration.jms.dsl.Jms
                                .container(this.acarsMqConnectionFactory, this.acarsQueue)
                                .transactionManager(transactionManager(this.acarsMqConnectionFactory)).get()))
                .wireTap(ACARS_WIRE_TAP_CHNL) // Log the raw messaged
                .transform(agmTransformer, "parseXMLMessage") // Parse the AGM xml message
                .handle(acarsProcessor, "pushAcarsRawData") // push raw acars data
                .wireTap(ACARS_WIRE_TAP_CHNL_DYNAMODB) // Log the raw messaged
                .transform(agmTransformer, "populateSmi") // Populate SMI
                // .transform(agmTransformer, "populateSmi") //Populate SMI
                .filter(acarsFilter, "filterMessageOnSmi") // Filter on SMI
                .transform(agmTransformer, "populateImi") // Populate IMI
                .filter(acarsFilter, "filterMessageOnSmiImi") // Filter on IMI
                .transform(acarsProcessor, "processEvent") // Parse
                .publishSubscribeChannel(
                        pubSub -> pubSub
                                .subscribe(flow -> flow.bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
                                        .enrichHeaders(
                                                h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNbr")) // Add flight number as key
                                        .transform("payload.message") // publish the transformed message
                                        .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(acarsKafkaTopic))) // publish to kafka
                                .subscribe(flow -> flow.channel(UPDATE_DATA_STORE_CHNL))) // send to a different channel to update couchbase
                .get();

您能否建议在集成流程中可以做些什么来提高处理性能。

标签: spring-integrationspring-integration-dsl

解决方案


您应该分析您的应用程序以查看瓶颈在哪里。

通常,为了提高吞吐量,您需要增加concurrency侦听器容器以并行处理消息。使用 Kafka,您至少需要与并发线程一样多的分区。

如果瓶颈在您的下游组件之一中,增加并发性可能无济于事;因此需要进行剖析。


推荐阅读