c# - 为什么有时我在 NServicebus 队列上的消息会获得无限的 DeliveryCount?
问题描述
对于我的一些 NServiceBus 集成,我看到我的消息“卡住了”。我的队列接收来自我的传奇之一的消息,通常一切正常。消息进来,它正在被处理,然后从队列中删除。但是,有时它只是停止从队列中删除这些消息。它处理消息(在这种情况下,它们被发送到 SQL-DB),然后将其保留在队列中,将传递计数增加 1。在我注意到这一点之后,我禁用并启用了处理我的 Saga 的 WebJob (有时单个消息的 DeliveryCounts 会达到超过 3000 的值,即使根据文档默认 MaxDeliveryCount 应设置为 6)。这样做可以暂时解决问题,直到它在某个时候再次出现。
一些 WebJobs 在 .NetFrameWork 461 上运行,这些都运行良好。有时“停止工作”的是基于 .NetCore 2.1 构建的。我并不是暗示这是框架的问题,但我的猜测是错误可能与设置端点有关(因为这些版本之间的端点配置有点不同)。
我已经尝试通过向队列发送 15.000 多条消息来复制错误,或者通过禁用 WebJobs 并仅在队列已满时激活它们。没有任何效果,问题只是随机出现。这意味着大多数时候一切都很好,直到那一刻它决定不再发生。
private async Task<EndpointConfiguration> BuildDefaultConfiguration()
{
var environment = this.configuration["Environment"];
var endpointConfiguration = new EndpointConfiguration(this.endpointName);
endpointConfiguration.SendHeartbeatTo($"particular.servicecontrol.{environment}");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
var host = Environment.GetEnvironmentVariable("WEBSITE_INSTANCE_ID") ?? Environment.MachineName;
endpointConfiguration
.UniquelyIdentifyRunningInstance()
.UsingNames(environment, host)
.UsingCustomDisplayName(environment);
var metrics = endpointConfiguration.EnableMetrics();
metrics.SendMetricDataToServiceControl($"particular.monitoring.{environment}", TimeSpan.FromSeconds(2));
endpointConfiguration.UseContainer<NinjectBuilder>(customizations =>
{
customizations.ExistingKernel(this.kernel);
});
endpointConfiguration.ApplyCustomConventions();
endpointConfiguration.EnableInstallers();
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
var connectionString = this.configuration["ConnectionStrings:ServiceBus"];
var transportExtensions = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
transportExtensions.ConnectionString(connectionString);
transportExtensions.UseWebSockets();
transportExtensions.PrefetchCount(1);
// license
var cloudStorageAccount = CloudStorageAccount.Parse(this.configuration["ConnectionStrings:Storage"]);
var cloudBlobClient = cloudStorageAccount.CreateCloudBlobClient();
var cloudBlobContainer = cloudBlobClient.GetContainerReference("configurations");
await cloudBlobContainer.CreateIfNotExistsAsync().ConfigureAwait(false);
var blockBlobReference = cloudBlobContainer.GetBlockBlobReference("license.xml");
endpointConfiguration.License(await blockBlobReference.DownloadTextAsync().ConfigureAwait(false));
endpointConfiguration.DefineCriticalErrorAction(async context =>
{
try
{
await context.Stop().ConfigureAwait(false);
}
finally
{
Environment.FailFast($"Critical error shutting down:'{context.Error}'.", context.Exception);
}
});
return endpointConfiguration;
}
我已经包含了设置 EndPointConfiguration 的函数。我希望我在这里遗漏了导致错误的东西,但我不知道它是什么。澄清:我实际上没有收到错误。我只是注意到正在处理消息,但没有从队列中删除。
解决方案
推荐阅读
- realm - 领域 js:访问无效的结果对象
- java - 启动时决定的 URL 的 RequestMapping
- r - 为什么我在使用 RWEKA 包时出现错误
- python - Python url 中的通配符模式
- python - 使用带有 Excel 工作表数据的 Pandas 获取基于用户 ID 的数据?
- java - 为什么从 Firebase 让 Uid 不起作用?
- swift - 如何更改 CVPixelBuffer 数据格式
- c# - 发布传递空模型值以及如何正确删除或修复它?
- javascript - 由于 angulat6 中的撇号符号而无法显示图像
- java - 如何将android库项目转换为fat jar