首页 > 解决方案 > Azure 事件中心:QuotaExceededException:4999 个句柄限制

问题描述

我在 WebApi 应用程序中使用 C# 的 Azure 事件中心库。我在发送消息时遇到了这个异常:

"Message":"An error has occurred.",
"ExceptionMessage":"Cannot allocate more handles to the current session or connection. The maximum number of handles allowed is 4999. Please free up resources and try again., referenceId: d975f39c71a14ae5915c9adca322e110_G15"
"ExceptionType":"Microsoft.ServiceBus.Messaging.QuotaExceededException"

我认为EventHubProducerClient一次实例化并重用它而不是在每条消息发送(带有IAsyncDisposable模式)上创建它的实例将有所帮助,如此处所述 EventHub Exception :Cannot allocate more handle to the current session or connection。但它没有。

我相信可能会有一些更全球性的问题。可能会遗漏一些东西。

    public class EventHubService : SubscriberBase
    {
        private readonly Action<string> errorHandler;
        private readonly BlobContainerClient blobContainerClient;
        private readonly EventProcessorClient eventProcessorClient;
        private readonly EventHubProducerClient eventHubProducerClient;
        private readonly int eventsToCheckpoint;
        private readonly Timer checkpointTimer;
        private int eventsSinceLastCheckpoint;
        private bool shouldUpdateCheckpoint;

        public EventHubService(EventHubSettings settings, Action<string> errorHandler) : base()
        {
            this.errorHandler = errorHandler;
            eventHubProducerClient = new EventHubProducerClient(settings.ConnectionString, settings.EventHubName);

            if (!String.IsNullOrWhiteSpace(settings.GroupId))
            {
                eventManager = new EventManager();

                blobContainerClient = new BlobContainerClient(settings.StorageConnectionString, settings.BlobContainerName);
                eventProcessorClient = new EventProcessorClient(blobContainerClient, settings.GroupId, settings.ConnectionString, settings.EventHubName);

                eventsToCheckpoint = settings.EventsToUpdateCheckpoint;
                checkpointTimer = new Timer(settings.IntervalToUpdateCheckpoint.TotalMilliseconds);
                checkpointTimer.Elapsed += (sender, eventArgs) => shouldUpdateCheckpoint = true;
            }
        }

        public override void Start()
        {
            eventProcessorClient.ProcessErrorAsync += HandleError;
            eventProcessorClient.ProcessEventAsync += ProcessEventData;
            eventProcessorClient.StartProcessingAsync().Wait();
            checkpointTimer.Start();
        }

        public override async Task Stop()
        {
            try
            {
                checkpointTimer.Stop();
                await eventProcessorClient.StopProcessingAsync();
            }
            finally
            {
                eventProcessorClient.ProcessEventAsync -= ProcessEventData;
                eventProcessorClient.ProcessErrorAsync -= HandleError;
            }
        }

        public override async Task Publish(string topic, JObject message)
        {
            using (EventDataBatch eventBatch = await eventHubProducerClient.CreateBatchAsync())
            {
                var @event = new Event(topic, message);
                string json = @event.ToString(Formatting.None);
                byte[] bytes = Encoding.UTF8.GetBytes(json);
                var eventData = new EventData(bytes);
                eventBatch.TryAdd(eventData);

                await eventHubProducerClient.SendAsync(eventBatch);
            }
        }

        private async Task ProcessEventData(ProcessEventArgs eventArgs)
        {
            if (eventArgs.CancellationToken.IsCancellationRequested)
            {
                return;
            }

            if (++eventsSinceLastCheckpoint >= eventsToCheckpoint)
            {
                eventsSinceLastCheckpoint = 0;
                shouldUpdateCheckpoint = true;
            }

            if (shouldUpdateCheckpoint)
            {
                await eventArgs.UpdateCheckpointAsync();
                shouldUpdateCheckpoint = false;
            }

            string json = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());
            var @event = new Event(json);
            eventManager.TryRaise(@event);
        }

        private Task HandleError(ProcessErrorEventArgs eventArgs)
        {
            if (!eventArgs.CancellationToken.IsCancellationRequested)
            {
                errorHandler.Invoke($"[P:{eventArgs.PartitionId}][O:{eventArgs.Operation}] {eventArgs.Exception.Message}");
            }

            return Task.CompletedTask;
        }
    }  

我在Service Bus Quotas中找到了一些信息,例如:

队列、主题或订阅实体上的并发接收请求数 (5000)。

后续接收请求被拒绝,调用代码收到异常。此配额适用于主题上所有订阅的并发接收操作的总和。

但还是想不出怎么处理。

请帮忙。谢谢。

标签: c#azure-eventhub

解决方案


这确实是答案EventHub Exception:Cannot allocate more handle to the current session or connection

我为NET Core的 Azure 事件中心库做了类似的“修复”,但我忘记了我也在使用用于NET Framework的 Azure 事件中心库!

所以我已经实例化EventHubProducerClient了一次并现在重用它。似乎工作正常。

我的错。不够细心。


推荐阅读