首页 > 解决方案 > 带有 SQS/SNS 的 MassTransit。发布到 SNS?

问题描述

有一个带有 SQS 的 MassTransit 的官方示例。“总线”配置为使用 SQS (x.UsingAmazonSqs)。接收端点是一个 SQS,它又订阅了一个 SNS 主题。但是没有示例如何发布到 SNS。

  1. 如何发布到 SNS 主题?
  2. 由于我是针对 localstack 开发的,如何配置 SQS/SNS 以使用 http?

AWS SDK 版本:

var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };

更新:

在 Chris 的参考和配置实验之后,我为“localstack”SQS/SNS 提出了以下建议。此配置执行没有错误,并且 Worker 被调用,并将消息发布到总线。但是,消费者类没有被触发,并且消息似乎没有最终进入队列(或者更确切地说是主题)。

public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};

...
services.AddMassTransit(x =>
{
    x.AddConsumer<MessageConsumer>();
    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
        {
            h.Config(AmazonSQSConfig);
            h.Config(AmazonSnsConfig);

            h.EnableScopedTopics();
        });

        cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
        {
            e.Subscribe("deal-topic", s =>
            {
            });
        });
    });
});
  
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
 

更新 2:

当我查看 sns 订阅时,我发现通过 aws cli 手动创建和订阅的第一个具有正确的端点,而由 MassTransit 库创建的第二个具有不正确的端点。如何为 SQS 队列配置 Endpoint?

$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
    "Subscriptions": [
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "http://localhost:4566/000000000000/deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        },
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        }

更新 3:

我已经克隆了该项目并为 AmazonSQS 总线配置运行了该项目的一些单元测试,消费者似乎没有工作。 在此处输入图像描述

当我在测试运行后列出订阅时,我可以看出端点不正确。

...
{
    "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
    "Owner": "",
    "Protocol": "sqs",
    "Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
    "TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...

会不会是用于 localstack 的 AmazonSQS 有一个重大错误?

目前尚不清楚如何使用带有“localstack”sqs 的库,如何指出 SQS 队列的实际端点(QueueUrl)。

标签: amazon-sqsamazon-snsmasstransit

解决方案


看起来您的配置已正确设置为发布,但至少有几个原因我能想到您没有收到消息的原因:

  1. 当前版本的 localstack 存在问题。我不得不使用 0.11.2 - 请参阅Localstack with MassTransit not getting messages
  2. 您正在发布到不同的主题。Masstransit 将使用消息类型的名称创建主题。这可能与您在接收端点上配置的主题不匹配。您可以通过配置拓扑来更改主题名称 - 请参阅使用 MassTransit SQS 时如何配置主题名称?
  3. 您的消费者未在接收端点上配置 - 请参见下面的示例
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};

...
services.AddMassTransit(x =>
{
    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
        {
            h.Config(AmazonSQSConfig);
            h.Config(AmazonSnsConfig);
        });

        cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
        {
            e.Subscribe("deal-topic", s => {});
            e.Consumer<MessageConsumer>();
        });
    });
});
  
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();

从我在有关消费者的文档中看到的内容,您应该能够AddMastTransit像原始示例一样将您的消费者添加到配置中,但这对我不起作用。


推荐阅读