java - AWS SQS 标准队列的并行轮询 - 消息处理太慢
问题描述
我有一个模块,它以指定的时间间隔轮询 AWS SQS 队列,一次一条消息,使用ReceiveMessageRequest
. 以下是方法:
public static ReceiveMessageResult receiveMessageFromQueue() {
String targetedQueueUrl = sqsClient.getQueueUrl("myAWSqueueName").getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(targetedQueueUrl)
.withWaitTimeSeconds(10).withMaxNumberOfMessages(1);
return sqsClient.receiveMessage(receiveMessageRequest);
}
一旦收到并处理了一条消息,它就会从队列中删除,带有DeleteMessageResult
.
public static DeleteMessageResult deleteMessageFromQueue(String receiptHandle) {
log.info("Deleting Message with receipt handle - [{}]", receiptHandle);
String targetedQueueUrl = sqsClient.getQueueUrl("myAWSqueueName").getQueueUrl();
return sqsClient.deleteMessage(new DeleteMessageRequest(targetedQueueUrl, receiptHandle));
}
我创建了一个可执行的 jar 文件,该文件部署在大约 40 个实例中,并且正在主动轮询队列。我可以看到他们每个人都收到消息。但在 AWS SQS 控制台中,我只能在“飞行中消息”列上看到数字 0、1、2 或 3。为什么即使有 40 多个不同的消费者从队列中接收消息,为什么会这样呢?此外,队列中可用的消息数量非常缓慢地减少。
以下是队列的配置参数。
Default Visibility Timeout: 30 seconds
Message Retention Period: 4 days
Maximum Message Size: 256 KB
Receive Message Wait Time: 0 seconds
Messages Available (Visible): 4,776
Delivery Delay: 0 seconds
Messages in Flight (Not Visible): 2
Queue Type: Standard
Messages Delayed: 0
Content-Based Deduplication: N/A
为什么即使有多个消费者,消息也没有得到快速处理?我是否需要修改任何队列参数或接收消息/删除消息请求中的某些内容?请指教。
更新:
所有 EC2 实例和 SQS 都在同一个区域中。消费者(轮询队列的 jar 文件)作为 EC2 实例的启动脚本的一部分运行。它有一个计划任务,每 12 秒轮询一次队列。在将消息推送到队列之前,我启动了 2-3 个实例。(当时我们可能有一些已经运行的实例 - 这增加了队列的接收者数量(上限为 50)。收到消息后,它将执行一些任务(包括一些数据库操作、数据分析和计算、报告文件生成并将报告上传到 S3 等..),大约需要 10-12 秒。完成后,它会从队列中删除消息。下图是过去 1 周的 SQS 指标的屏幕截图(来自 SQS监控控制台)。
解决方案
我会根据所提供的信息尽我所能。有关您的处理循环逻辑、区域设置和指标(见下文)的更多详细信息将有助于改进此答案。
我创建了一个可执行的 jar 文件,该文件部署在大约 40 个实例中,并且正在主动轮询队列。我可以看到他们每个人都收到消息。但在 AWS SQS 控制台中,我只能在“飞行中消息”列上看到数字 0、1、2 或 3。为什么即使有 40 多个不同的消费者从队列中接收消息,为什么会这样呢?此外,队列中可用的消息数量非常缓慢地减少。
为什么即使有多个消费者,消息也没有得到快速处理?我是否需要修改任何队列参数或接收消息/删除消息请求中的某些内容?
您没有看到与您处理消息的主机数量更接近的飞行中数字这一事实肯定表明存在问题 - 您的消息处理速度非常快(似乎并非如此)或你的主人没有做你认为的工作。
通常,从 SQS 获取和删除一条消息应该在几毫秒的范围内。如果没有有关您的设置的更多详细信息,这应该可以帮助您开始进行故障排除。(其中一些步骤可能看起来很明显,但其中的每一个都是我见过的开发人员遇到的现实生活问题的根源。)
- 如果您为每个接收进程删除启动一个新进程,则此开销将大大减慢您的速度。我假设您没有这样做,并且每个主机都在单个进程中运行一个循环
- 验证您的处理循环不会致命并重新启动您(有效地将其变成上述情况)。
- 我假设您还验证了您的流程在消息处理之外也没有做大量工作。
- 您应该生成一些客户端指标来指示 SQS 请求在每个主机上占用的时间。
- Cloudwatch 将部分为您执行此操作,但实际的客户端指标总是有用的。
- 推荐以下基本指标:(1) 接收延迟,(2) 进程延迟,(3) 删除延迟,(4) 整个消息循环延迟 (5) 成功/失败计数器
- 您的 EC2 实例(进行处理的主机)应与 SQS 队列位于同一区域。如果您正在进行跨区域调用,这将影响您的延迟。
- 确保这些主机有足够的 CPU/内存资源来进行处理
- 作为优化,我建议每台主机使用更多线程,更少主机 - 重用客户端连接并最大限度地利用计算资源总是更好。
- 验证运行测试时没有出现中断或持续问题
- 在某些初始化步骤中,在您的应用程序的整个生命周期内只执行
getQueueUrl
一次。您不需要重复调用它,因为它将是相同的 URL- 这实际上是我在您的代码中注意到的第一件事,但它在这里很重要,因为如果上述问题是原因,则会产生更大的影响。
- 如果您的消息处理时间非常短(少于检索和删除消息所需的时间),那么您的主机将花费大部分时间来获取消息。这方面的指标也很重要。
- 在这种情况下,您可能应该进行批量获取,而不是一次一个。
- 根据您队列中的消息数量以及消息进展缓慢的评论,听起来情况并非如此。
- 验证所有主机实际上都在访问同一个队列(而不是某些 beta/gamma 版本,或者您曾经用于测试的旧版本)
进一步说明:
- 另一个答案表明可见性超时是一个潜在的原因——这是完全错误的。可见性超时不会阻塞队列 - 它只会影响消息在另一个 receiveMessageRequest 可以接收该消息之前保持“正在运行”的时间。
- 如果您想在出现错误/处理器速度慢的情况下尽快重新处理您的消息,您会考虑减少这种情况。
推荐阅读
- function - 重构箭头函数以兼容 IE
- android-studio - Android Studio 3.3.1 中关于 Github 的 Update Project 和 Pull 命令有什么不同?
- webserver - 使用 Esp8266 传递强制门户
- android - Retrofit2:adapter-rxjava 证书问题
- powershell - 根据内容有选择地更新文件中的行
- elasticsearch - 弹性:我想写一个查询(在特定月份的任何时候遇到的每小时峰值量)
- javascript - 这个素数算法的时间复杂度是多少
- linux - Matlab的fwrite:跳过的字节会发生什么?
- python - `MANIFEST.in` 的作用是什么?
- javascript - React 如何在 DOM 中渲染复杂的数据数组?