c# - 如何在 c# 中接收来自 Eventhub 的最新消息
问题描述
我想收到来自 eventthub 的最新消息,因为之前有太多消息发送到 eventthub。我设置了自己的消费组,但似乎只能从早期到现在接收消息。
有什么方法可以将消息跳过到最新消息?
解决方案
如果您知道azure eventthubs 中的检查点,这应该很容易实现。
简而言之,checkpoint是一个存储在azure blob storage中的文件,每次从eventthub读取数据时,都会在checkpoint中记录偏移量。并且当下次使用同一个检查点从 eventthubs 读取数据时,它将从偏移量开始读取。
所以如果你想跳过读取旧数据,你可以先像这样创建一个测试接收器项目并设置检查点。然后下次,在您的生产中,您可以使用相同的检查点,因为偏移量,它总是会跳过旧数据。
另一种方法是,您可以使用EventProcessorOptions
withRegisterEventProcessorAsync
方法。当您选择使用此方法时,您需要手动从 azure blob storage 中删除检查点,否则此设置将被检查点覆盖。
如下示例,在您的接收器方法中:
private static async Task MainAsync(string[] args)
{
Console.WriteLine("Registering EventProcessor...");
var eventProcessorHost = new EventProcessorHost(
EventHubName,
PartitionReceiver.DefaultConsumerGroupName,
EventHubConnectionString,
StorageConnectionString,
StorageContainerName);
//here, you can get only the data sent in event hub in the recent an hour.
var options = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow.AddHours(-1))
};
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options);
Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine();
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
推荐阅读
- r - 如何使用多边形函数获得密度图?我有多个组要绘制
- api - 是否可以按类型过滤 GitHub REST API 事件?
- c# - 部分构建 Unity3D 编辑器扩展
- java - objectmapper 如何转换为我的对象,它是否创建了新实例
- javascript - 链接多个 promise.all() 语句
- batch-file - 用bat文件删除txt的特定部分
- python - 如果列名包含某些子字符串,如何更改列名?
- android - 如何在 Android 中显示文件目录?
- scala - Spark 中允许执行更新和删除操作的替代选项
- php - 棘轮服务器实例化:“不支持声明‘strict_types’”