首页 > 解决方案 > 如何在 c# 中接收来自 Eventhub 的最新消息

问题描述

我想收到来自 eventthub 的最新消息,因为之前有太多消息发送到 eventthub。我设置了自己的消费组,但似乎只能从早期到现在接收消息。

有什么方法可以将消息跳过到最新消息?

标签: c#azureazure-eventhub

解决方案


如果您知道azure eventthubs 中的检查点,这应该很容易实现。

简而言之,checkpoint是一个存储在azure blob storage中的文件,每次从eventthub读取数据时,都会在checkpoint中记录偏移量。并且当下次使用同一个检查点从 eventthubs 读取数据时,它将从偏移量开始读取。

所以如果你想跳过读取旧数据,你可以先像这样创建一个测试接收器项目并设置检查点。然后下次,在您的生产中,您可以使用相同的检查点,因为偏移量,它总是会跳过旧数据。

另一种方法是,您可以使用EventProcessorOptionswithRegisterEventProcessorAsync方法。当您选择使用此方法时,您需要手动从 azure blob storage 中删除检查点,否则此设置将被检查点覆盖。

如下示例,在您的接收器方法中:

        private static async Task MainAsync(string[] args)
        {
            Console.WriteLine("Registering EventProcessor...");

            var eventProcessorHost = new EventProcessorHost(
                EventHubName,
                PartitionReceiver.DefaultConsumerGroupName,
                EventHubConnectionString,
                StorageConnectionString,
                StorageContainerName);

            //here, you can get only the data sent in event hub in the recent an hour.
            var options = new EventProcessorOptions
            {

                InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow.AddHours(-1))
            };


            // Registers the Event Processor Host and starts receiving messages                

            await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options);

            Console.WriteLine("Receiving. Press ENTER to stop worker.");
            Console.ReadLine();

            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }

推荐阅读