首页 > 解决方案 > 为什么有时我在 NServicebus 队列上的消息会获得无限的 DeliveryCount?

问题描述

对于我的一些 NServiceBus 集成,我看到我的消息“卡住了”。我的队列接收来自我的传奇之一的消息,通常一切正常。消息进来,它正在被处理,然后从队列中删除。但是,有时它只是停止从队列中删除这些消息。它处理消息(在这种情况下,它们被发送到 SQL-DB),然后将其保留在队列中,将传递计数增加 1。在我注意到这一点之后,我禁用并启用了处理我的 Saga 的 WebJob (有时单个消息的 DeliveryCounts 会达到超过 3000 的值,即使根据文档默认 MaxDeliveryCount 应设置为 6)。这样做可以暂时解决问题,直到它在某个时候再次出现。

一些 WebJobs 在 .NetFrameWork 461 上运行,这些都运行良好。有时“停止工作”的是基于 .NetCore 2.1 构建的。我并不是暗示这是框架的问题,但我的猜测是错误可能与设置端点有关(因为这些版本之间的端点配置有点不同)。

我已经尝试通过向队列发送 15.000 多条消息来复制错误,或者通过禁用 WebJobs 并仅在队列已满时激活它们。没有任何效果,问题只是随机出现。这意味着大多数时候一切都很好,直到那一刻它决定不再发生。

    private async Task<EndpointConfiguration> BuildDefaultConfiguration()
    {
        var environment = this.configuration["Environment"];
        var endpointConfiguration = new EndpointConfiguration(this.endpointName);

        endpointConfiguration.SendHeartbeatTo($"particular.servicecontrol.{environment}");
        endpointConfiguration.SendFailedMessagesTo("error");
        endpointConfiguration.AuditProcessedMessagesTo("audit");

        var host = Environment.GetEnvironmentVariable("WEBSITE_INSTANCE_ID") ?? Environment.MachineName;
        endpointConfiguration
            .UniquelyIdentifyRunningInstance()
            .UsingNames(environment, host)
            .UsingCustomDisplayName(environment);

        var metrics = endpointConfiguration.EnableMetrics();
        metrics.SendMetricDataToServiceControl($"particular.monitoring.{environment}", TimeSpan.FromSeconds(2));

        endpointConfiguration.UseContainer<NinjectBuilder>(customizations =>
        {
            customizations.ExistingKernel(this.kernel);
        });

        endpointConfiguration.ApplyCustomConventions();
        endpointConfiguration.EnableInstallers();
        endpointConfiguration.UseSerialization<NewtonsoftSerializer>();

        var connectionString = this.configuration["ConnectionStrings:ServiceBus"];
        var transportExtensions = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
        transportExtensions.ConnectionString(connectionString);
        transportExtensions.UseWebSockets();
        transportExtensions.PrefetchCount(1);

        // license
        var cloudStorageAccount = CloudStorageAccount.Parse(this.configuration["ConnectionStrings:Storage"]);
        var cloudBlobClient = cloudStorageAccount.CreateCloudBlobClient();
        var cloudBlobContainer = cloudBlobClient.GetContainerReference("configurations");
        await cloudBlobContainer.CreateIfNotExistsAsync().ConfigureAwait(false);
        var blockBlobReference = cloudBlobContainer.GetBlockBlobReference("license.xml");
        endpointConfiguration.License(await blockBlobReference.DownloadTextAsync().ConfigureAwait(false));

        endpointConfiguration.DefineCriticalErrorAction(async context =>
        {
            try
            {
                await context.Stop().ConfigureAwait(false);
            }
            finally
            {
                Environment.FailFast($"Critical error shutting down:'{context.Error}'.", context.Exception);
            }
        });

        return endpointConfiguration;
    }

我已经包含了设置 EndPointConfiguration 的函数。我希望我在这里遗漏了导致错误的东西,但我不知道它是什么。澄清:我实际上没有收到错误。我只是注意到正在处理消息,但没有从队列中删除。

标签: c#azure.net-corenservicebus

解决方案


推荐阅读