首页 > 解决方案 > Dotnet 和 Azure 事件中心:按位置检索事件

问题描述

我一直在尝试使用 Microsoft.Azure.EventHubs 按其位置检索事件。

我被告知有一种方法可以使用它的偏移量或序列号来计算事件位置,所以每次我将事件添加到 EventBatch 时,我都会在 redis 中缓存一个 id 以及事件的偏移量和序列号。

然后,每当我想检索特定事件时,我都会在 redis 上搜索它的 Id,检索它的偏移量和序列号,并可能在事件中心流中检索它。

问题是偏移量和序列号是负长的,我无法理解如何将其用作索引。

你们知道怎么做吗?

这是我的发布者和检索器类

    public class EventHubPublisher
    {
        public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid)
        {
            var producerClient = new EventHubProducerClient(_connectionString, _eventHubName);
            var eventBatch = await producerClient.CreateBatchAsync();
            var data = new EventData(Encoding.UTF8.GetBytes(_message));
            eventBatch.TryAdd(data);
            addDatatoRedis(data.SequenceNumber,data.Offset,onboardingid);
            await producerClient.SendAsync(eventBatch);
        }
        private static void addDatatoRedis(long sequenceNumber, long offset, string onboardingid)
        {
            try
            {
                var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
                var redis = redisConnection.GetDatabase();
                var value = new
                {
                    sequence_number = sequenceNumber.ToString(),
                    offset = offset.ToString(),
                };
                redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
            }
            catch (Exception)
            {
                throw;
            }
        }
    }
public class EventHubRetriever
    {
        public static async Task GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
        {
            try
            {
                var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
                var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync("0");
                var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
                var eventPosition = EventPosition.FromOffset(existentEvent["offset"].ToString());
                var lastEnqueuedOffset = Convert.ToInt32(partitionRuntimeInformation.LastEnqueuedOffset);

                var offset = existentEvent["offset"];

                // var offsetPosition = lastEnqueuedOffset + offset;

                // var receiver = EventHubClient.Create();
            }
            catch (System.Exception)
            {

                throw;
            }
        }
    }

标签: .netapache-kafkaazure-eventhub

解决方案


好吧,我不确定我所做的是否是正确的方法,但它似乎正在奏效。当我要发布消息时,我从 GetPartitionRunTime 中获取 lastEnqueuedNumber,加 1 并将其作为属性添加到 eventData。由于我还将事件添加到 redis 缓存,因此我能够检索它的序列号。

    public class EventHubPublisher
    {
        public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid, string partition)
        {
            var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
            var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync(partition);
            var sequenceNumber = partitionRuntimeInformation.LastEnqueuedSequenceNumber + 1;

            var data = new EventData(Encoding.UTF8.GetBytes(_message));

            data.Properties.Add("Id", onboardingid);
            data.Properties.Add("Message", _message);
            data.Properties.Add("SequenceNumber", sequenceNumber);
            await client.SendAsync(data);
            addDatatoRedis(onboardingid, sequenceNumber, data.Body.Offset, _eventHubName);
        }
        private static void addDatatoRedis(string onboardingid, long sequenceNumber, int offset, string topic)
        {
            try
            {
                var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
                var redis = redisConnection.GetDatabase();
                var value = new
                {
                    offset = offset,
                    id = onboardingid,
                    sequenceNumber = sequenceNumber,
                    topic = topic
                };
                redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
            }
            catch (Exception)
            {
                throw;
            }
        }

然后,当从 EventHub 检索事件时,我能够从缓存的事件中获取 sequenceNumber,并通过其索引获取 eventthub 上的事件。

    public class EventHubRetriever
    {
        public static async Task<EventData> GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
        {
            try
            {
                var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
                var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
                var partitionsIds = eventHubRunTimeInformation.PartitionIds;

                var sequenceNumber = (long)existentEvent["sequenceNumber"];

                var retrievedEvent = GetEventByPosition(client, sequenceNumber, partitionsIds);

                return retrievedEvent;

            }
            catch (Exception exc)
            {
                throw new EventRetrieveException(exc, "An error ocurred while retrieving data from Event Hub.");
            }
        }

        private static EventData GetEventByPosition(EventHubClient client, long sequenceNumber, string[] partitionsIds, string eventHubName)
        {
            var eventPosition = EventPosition.FromSequenceNumber(sequenceNumber, true);
            var partitionReceivers = partitionsIds
                                        .Select(id => client.CreateReceiver(eventHubName, id, eventPosition));
            var events = partitionReceivers.Select(receiver => receiver.ReceiveAsync(1)).SelectMany(x => x.Result).ToList();

            return events.ElementAt(0);
        }
    }


推荐阅读