.net-core - 在 Amazon SQS 传输上发布的 NServiceBus 路由器事件不由 Azure 服务总线传输端点处理
问题描述
我一直在尝试让 NServiceBus.Router 工作以允许使用 AmazonSQS 传输和 AzureServiceBus 传输的端点相互通信。到目前为止,我能够通过路由器从 ASB 端点发送并由 SQS 端点处理的命令。但是,当我从 SQS 端点发布事件时,即使我已将 SQS 端点注册为发布者,ASB 端点也不会处理它。我不知道我做错了什么,但是看看我可以从docs找到的每个示例,似乎它应该可以工作。
我已经尝试在下面添加另一个转发路由(SQS 到 ASB),但这并没有解决问题。
端点和路由器都在 .net 5 worker 服务中运行。
我已经制作了一个示例项目来重现这里的问题,但这里有一些快速概览的片段显示了相关设置:
路由器设置
var routerConfig = new RouterConfiguration("ASBToSQS.Router");
var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB", t =>
{
t.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));
t.Transactions(TransportTransactionMode.ReceiveOnly);
t.SubscriptionRuleNamingConvention((entityType) =>
{
var entityPathOrName = entityType.Name;
if (entityPathOrName.Length >= 50)
{
return entityPathOrName.Split('.').Last();
}
return entityPathOrName;
});
});
var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
{
t.UnrestrictedDurationDelayedDelivery();
t.Transactions(TransportTransactionMode.ReceiveOnly);
var settings = t.GetSettings();
// Avoids a missing setting error
//https://github.com/SzymonPobiega/NServiceBus.Raw/blob/master/src/AcceptanceTests.SQS/Helper.cs#L18
bool isMessageType(Type t) => true;
var ctor = typeof(MessageMetadataRegistry).GetConstructor(
BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance, null,
new[] {typeof(Func<Type, bool>)}, null);
#pragma warning disable CS0618 // Type or member is obsolete
settings.Set<MessageMetadataRegistry>(ctor.Invoke(new object[] {(Func<Type, bool>) isMessageType}));
#pragma warning restore CS0618 // Type or member is obsolete
});
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("ASB", "SQS");
routerConfig.AutoCreateQueues();
ASB 端点设置
var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.ASBEndpoint");
var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
transport.SubscriptionRuleNamingConvention((entityType) =>
{
var entityPathOrName = entityType.Name;
if (entityPathOrName.Length >= 50)
{
return entityPathOrName.Split('.').Last();
}
return entityPathOrName;
});
transport.Transactions(TransportTransactionMode.ReceiveOnly);
transport.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));
var bridge = transport.Routing().ConnectToRouter("ASBToSQS.Router");
bridge.RouteToEndpoint(typeof(ASBToSQSCommand), "ASBToSQSRouter.SQSEndpoint");
bridge.RegisterPublisher(typeof(ASBToSQSEvent), "ASBToSQSRouter.SQSEndpoint");
endpointConfiguration.EnableInstallers();
SQS Endpoint Setup(没什么特别的,因为它不需要知道路由器)
var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.SQSEndpoint");
var transport = endpointConfiguration.UseTransport<SqsTransport>();
transport.UnrestrictedDurationDelayedDelivery();
transport.Transactions(TransportTransactionMode.ReceiveOnly);
endpointConfiguration.EnableInstallers();
任何帮助将不胜感激!
解决方案
不幸的是,最近的 SQS 传输版本之一包含一项更改,该更改使订阅默认情况下仅在完整的 NServiceBus 端点的上下文中工作。此功能是订阅批处理。
为了让路由器正常工作(路由器不运行完整的端点,只是 NServiceBus 传输),您需要将这条神奇的线添加到 SQS 接口配置中:
settings.Set("NServiceBus.AmazonSQS.DisableSubscribeBatchingOnStart", true);
这是一个未记录的标志,它禁用订阅批处理并允许路由器正常完成订阅操作。
有所不便,敬请谅解。
推荐阅读
- sas - 在日期之间循环
- python - 如何删除复选框中的数据?
- android - 如何在 Flutter 中获取 AppBar 高度?
- html - 使用 CSS Grids 和 @Media Q 制作移动 1st 网站
- windows - 有没有办法在使用堆栈构建时在 powershell 终端上正确格式化 GHC 错误消息?
- python - 与 pyinstaller 捆绑后的 pyexcel.exceptions.UnknownParameters 错误
- tensorflow - 将 TensorFlow 梯度计算分成两个(或更多)部分
- php - 如何在 FOUND_ROWS() 上使用 COALESCE()?
- angular - 将 vendor.js 拆分为多个块
- php - 检查用户是否在线 laravel