amazon-sqs - 带有 SQS/SNS 的 MassTransit。发布到 SNS?
问题描述
有一个带有 SQS 的 MassTransit 的官方示例。“总线”配置为使用 SQS (x.UsingAmazonSqs)。接收端点是一个 SQS,它又订阅了一个 SNS 主题。但是没有示例如何发布到 SNS。
- 如何发布到 SNS 主题?
- 由于我是针对 localstack 开发的,如何配置 SQS/SNS 以使用 http?
AWS SDK 版本:
var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };
更新:
在 Chris 的参考和配置实验之后,我为“localstack”SQS/SNS 提出了以下建议。此配置执行没有错误,并且 Worker 被调用,并将消息发布到总线。但是,消费者类没有被触发,并且消息似乎没有最终进入队列(或者更确切地说是主题)。
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
h.EnableScopedTopics();
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s =>
{
});
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
更新 2:
当我查看 sns 订阅时,我发现通过 aws cli 手动创建和订阅的第一个具有正确的端点,而由 MassTransit 库创建的第二个具有不正确的端点。如何为 SQS 队列配置 Endpoint?
$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
"Subscriptions": [
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "http://localhost:4566/000000000000/deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
},
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
}
更新 3:
我已经克隆了该项目并为 AmazonSQS 总线配置运行了该项目的一些单元测试,消费者似乎没有工作。
当我在测试运行后列出订阅时,我可以看出端点不正确。
...
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...
会不会是用于 localstack 的 AmazonSQS 有一个重大错误?
目前尚不清楚如何使用带有“localstack”sqs 的库,如何指出 SQS 队列的实际端点(QueueUrl)。
解决方案
看起来您的配置已正确设置为发布,但至少有几个原因我能想到您没有收到消息的原因:
- 当前版本的 localstack 存在问题。我不得不使用 0.11.2 - 请参阅Localstack with MassTransit not getting messages
- 您正在发布到不同的主题。Masstransit 将使用消息类型的名称创建主题。这可能与您在接收端点上配置的主题不匹配。您可以通过配置拓扑来更改主题名称 - 请参阅使用 MassTransit SQS 时如何配置主题名称?
- 您的消费者未在接收端点上配置 - 请参见下面的示例
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s => {});
e.Consumer<MessageConsumer>();
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
从我在有关消费者的文档中看到的内容,您应该能够AddMastTransit
像原始示例一样将您的消费者添加到配置中,但这对我不起作用。
推荐阅读
- python-3.8 - python 3.8 and pip gives this error[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate is not yet valid (_ssl.c:1123)')))
- grep - 如何在 Linux 中用 grep 查找带有星号的行?
- python - 按项目布尔过滤字典 - API Python
- python - 如何在 python 字典中迭代公式并将结果保存在 pandas dataFrame 中?
- c# - Why does pattern matching not compile with JToken
- webpack - 如何使用 Webpack 删除开发代码?
- java - Java:如何分离子进程或创建分离的进程
- vue.js - Vue.js - 如何在 vue v-for 循环中按值 2 递增索引?
- vba - 查询短文本字段中的文本返回“类型不匹配”
- python - 分批拆分张量流数据集