apache-camel - Camel 中批量消费者的聚合结果(例如来自 SQS)
问题描述
我正在使用来自 SQS FIFO 队列的消息maxMessagesPerPoll=5
。
目前我正在单独处理每条消息,这完全浪费了资源。就我而言,由于我们使用的是 FIFO 队列,并且所有这 5 条消息都与同一个对象相关,因此我可以一起处理它们。
我虽然这可以通过使用aggregate
模式来完成,但我无法得到任何结果。
我的消费者路线如下所示:
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.process(exchange -> {
// process the message
})
我相信应该可以做这样的事情
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.aggregate(const(true), new GroupedExchangeAggregationStrategy())
.completionFromBatchConsumer()
.process(exchange -> {
// process ALL messages together as I now have a list of all exchanges
})
但processor
从未调用过。
第二件事:如果我能够完成这项工作,那么什么时候将 ACK 发送到 SQS?何时处理每个单独的消息或何时完成聚合过程?我希望后者
解决方案
问题在于GroupedExchangeAggregationStrategy
当我使用这个策略时,输出是所有交换的“数组”。这意味着完成谓词的交换不再具有初始属性。相反,它有CamelGroupedExchange
并且CamelAggregatedSize
没有使用completionFromBatchConsumer()
由于我实际上并不需要汇总所有交换,因此使用GroupedBodyAggregationStrategy
. 然后交换属性将保持在原始交换中,只有主体将包含一个“数组”
另一种解决方案是使用completionSize(Predicate predicate)
和使用从分组交换中提取必要价值的自定义谓词。
推荐阅读
- flutter - 未找到未定义的名称和吸气剂
- f# - 避免在程序集的两个部分中出现模块和类型定义的错误
- python - Python - 如何在 for 循环中打印 json 的内部键?
- c# - ServiceStack.OrmLite 使用错误的列定义创建表
- scala - 如何实现 CustomKeySerializer
- python - 当尝试在弹性搜索中进行批量更新时。我收到 403 错误?
- python - 如何让 docker 容器在主机内而不是容器内搜索文件?
- telethon - Telethon 无法再获取聊天实体
- python - 使用 genfromtxt 获取数据时遇到问题
- extjs - 如何在单击时显示包含两个单独工作的单选按钮的标题菜单