首页 > 解决方案 > Camel 中批量消费者的聚合结果(例如来自 SQS)

问题描述

我正在使用来自 SQS FIFO 队列的消息maxMessagesPerPoll=5

目前我正在单独处理每条消息,这完全浪费了资源。就我而言,由于我们使用的是 FIFO 队列,并且所有这 5 条消息都与同一个对象相关,因此我可以一起处理它们。

我虽然这可以通过使用aggregate模式来完成,但我无法得到任何结果。

我的消费者路线如下所示:

from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .process(exchange -> {
        // process the message
    })

我相信应该可以做这样的事情

from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .aggregate(const(true), new GroupedExchangeAggregationStrategy())
    .completionFromBatchConsumer()
    .process(exchange -> {
        // process ALL messages together as I now have a list of all exchanges
    })

processor从未调用过。

第二件事:如果我能够完成这项工作,那么什么时候将 ACK 发送到 SQS?何时处理每个单独的消息或何时完成聚合过程?我希望后者

标签: apache-camelamazon-sqs

解决方案


问题在于GroupedExchangeAggregationStrategy

当我使用这个策略时,输出是所有交换的“数组”。这意味着完成谓词的交换不再具有初始属性。相反,它有CamelGroupedExchange并且CamelAggregatedSize没有使用completionFromBatchConsumer()

由于我实际上并不需要汇总所有交换,因此使用GroupedBodyAggregationStrategy. 然后交换属性将保持在原始交换中,只有主体将包含一个“数组”

另一种解决方案是使用completionSize(Predicate predicate)和使用从分组交换中提取必要价值的自定义谓词。


推荐阅读