首页 > 解决方案 > 如何将消息从 Amazon SQS 传递到服务的所有正在运行的实例

问题描述

我有两个服务,一个是生产者(服务 A),一个是消费者(服务 B)。因此,服务 A 将生成一条消息,该消息将发布到 Amazon SQS 服务,然后在订阅队列时将其传递给服务 B。所以,在我有一个服务 B 实例之前,这可以正常工作。

但是当我启动另一个服务 B 实例时,现在有两个服务 B 实例,它们都订阅同一个队列,因为它是同一个服务,我观察到来自 SQS 的消息现在正在传递循环时尚。这样在给定的时间,只有一个服务 B 的实例接收到服务 A 发布的消息。我希望当一条消息发布到这个队列时,它应该被服务 B 的所有实例接收。

我们应该怎么做?我已将这些服务与 Spring 云依赖项一起开发为 Springboot 应用程序。

请参阅下图以供参考。 在此处输入图像描述

标签: amazon-web-servicesspring-bootjava-8amazon-sqs

解决方案


虽然您的消息可能看起来是以循环方式阅读的,但它们实际上并没有在循环中被消耗。SQS 的工作原理是让任何消费者(具有适当的 IAM 权限)都可以使用所有消息,并在一个消费者在您可以配置的预先配置的时间内获取消息后立即隐藏消息,从而有效地“锁定”该消息。您的所有消费者似乎都以循环方式运行这一事实很可能是巧合。

正如其他人所提到的,您可以使用 SNS 而不是 SQS 将消息一次发送给多个消费者,但这并不像听起来那么简单。如果您的服务 B 是负载均衡的,则 HTTP 端点订阅者将指向负载均衡器的 DNS 名称,因此只有一个实例会收到消息。假设您的实例具有公共 IP,您可以修改您的应用程序,以便在应用程序唤醒时自行注册为主题的 HTTP 订阅者。这里的缺点是您不仅绕过了负载均衡器,而且还失去了 SQS 附带的持久性保证,因为 SNS 主题将尝试发送消息 X 次,但之后会简单地丢弃消息。

另一种解决方案是将 SQS 队列上的消息隐藏超时设置更改为 0,这样消息永远不会被锁定,每个消费者都可以读取它。这也意味着您需要将您的应用程序修改为 a) 不处理消息两次,因为在完成处理时可能会多次读取同一条消息,并且 b) 在其中一个实例删除时优雅地处理故障来自队列的消息和其他实例在此之后尝试从队列中删除该消息。

或者,您可能希望使用某种服务网格或服务发现机制,以便实例可以以对等方式相互通信,以便一个实例可以从 SQS 队列中提取消息并将其传播到服务的其他实例。

您还可以使用 Redis 或 DynamoDB 等分布式存储来保存消息及其当前状态,以便每个实例都可以读取它们,但只有一个实例会插入新行。

最终有一些解决方案可以解决这个问题,但是如果不正确理解用例,就很难做出硬性建议。


推荐阅读