首页 > 解决方案 > 在 Amazon SQS 传输上发布的 NServiceBus 路由器事件不由 Azure 服务总线传输端点处理

问题描述

我一直在尝试让 NServiceBus.Router 工作以允许使用 AmazonSQS 传输和 AzureServiceBus 传输的端点相互通信。到目前为止,我能够通过路由器从 ASB 端点发送并由 SQS 端点处理的命令。但是,当我从 SQS 端点发布事件时,即使我已将 SQS 端点注册为发布者,ASB 端点也不会处理它。我不知道我做错了什么,但是看看我可以从docs找到的每个示例,似乎它应该可以工作。

我已经尝试在下面添加另一个转发路由(SQS 到 ASB),但这并没有解决问题。

端点和路由器都在 .net 5 worker 服务中运行。

我已经制作了一个示例项目来重现这里的问题,但这里有一些快速概览的片段显示了相关设置:

路由器设置

var routerConfig = new RouterConfiguration("ASBToSQS.Router");

var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB", t =>
{
    t.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));

    t.Transactions(TransportTransactionMode.ReceiveOnly);
    t.SubscriptionRuleNamingConvention((entityType) =>
    {
        var entityPathOrName = entityType.Name;
        if (entityPathOrName.Length >= 50)
        {
            return entityPathOrName.Split('.').Last();
        }

        return entityPathOrName;
    });
});

var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
{
    t.UnrestrictedDurationDelayedDelivery();

    t.Transactions(TransportTransactionMode.ReceiveOnly);

    var settings = t.GetSettings();

    // Avoids a missing setting error
    //https://github.com/SzymonPobiega/NServiceBus.Raw/blob/master/src/AcceptanceTests.SQS/Helper.cs#L18
    bool isMessageType(Type t) => true;
    var ctor = typeof(MessageMetadataRegistry).GetConstructor(
        BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance, null,
        new[] {typeof(Func<Type, bool>)}, null);
#pragma warning disable CS0618 // Type or member is obsolete
    settings.Set<MessageMetadataRegistry>(ctor.Invoke(new object[] {(Func<Type, bool>) isMessageType}));
#pragma warning restore CS0618 // Type or member is obsolete

});

var staticRouting = routerConfig.UseStaticRoutingProtocol();

staticRouting.AddForwardRoute("ASB", "SQS");

routerConfig.AutoCreateQueues();

ASB 端点设置

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.ASBEndpoint");

var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();

transport.SubscriptionRuleNamingConvention((entityType) =>
{
    var entityPathOrName = entityType.Name;
    if (entityPathOrName.Length >= 50)
    {
        return entityPathOrName.Split('.').Last();
    }

    return entityPathOrName;
});

transport.Transactions(TransportTransactionMode.ReceiveOnly);
transport.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));

var bridge = transport.Routing().ConnectToRouter("ASBToSQS.Router");

bridge.RouteToEndpoint(typeof(ASBToSQSCommand), "ASBToSQSRouter.SQSEndpoint");
bridge.RegisterPublisher(typeof(ASBToSQSEvent), "ASBToSQSRouter.SQSEndpoint");

endpointConfiguration.EnableInstallers();

SQS Endpoint Setup(没什么特别的,因为它不需要知道路由器)

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.SQSEndpoint");

var transport = endpointConfiguration.UseTransport<SqsTransport>();

transport.UnrestrictedDurationDelayedDelivery();

transport.Transactions(TransportTransactionMode.ReceiveOnly);

endpointConfiguration.EnableInstallers();

任何帮助将不胜感激!

标签: .net-corenservicebus

解决方案


不幸的是,最近的 SQS 传输版本之一包含一项更改,该更改使订阅默认情况下仅在完整的 NServiceBus 端点的上下文中工作。此功能是订阅批处理。

为了让路由器正常工作(路由器不运行完整的端点,只是 NServiceBus 传输),您需要将这条神奇的线添加到 SQS 接口配置中:

settings.Set("NServiceBus.AmazonSQS.DisableSubscribeBatchingOnStart", true);

这是一个未记录的标志,它禁用订阅批处理并允许路由器正常完成订阅操作。

有所不便,敬请谅解。


推荐阅读