.net - Dotnet 和 Azure 事件中心:按位置检索事件
问题描述
我一直在尝试使用 Microsoft.Azure.EventHubs 按其位置检索事件。
我被告知有一种方法可以使用它的偏移量或序列号来计算事件位置,所以每次我将事件添加到 EventBatch 时,我都会在 redis 中缓存一个 id 以及事件的偏移量和序列号。
然后,每当我想检索特定事件时,我都会在 redis 上搜索它的 Id,检索它的偏移量和序列号,并可能在事件中心流中检索它。
问题是偏移量和序列号是负长的,我无法理解如何将其用作索引。
你们知道怎么做吗?
这是我的发布者和检索器类
public class EventHubPublisher
{
public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid)
{
var producerClient = new EventHubProducerClient(_connectionString, _eventHubName);
var eventBatch = await producerClient.CreateBatchAsync();
var data = new EventData(Encoding.UTF8.GetBytes(_message));
eventBatch.TryAdd(data);
addDatatoRedis(data.SequenceNumber,data.Offset,onboardingid);
await producerClient.SendAsync(eventBatch);
}
private static void addDatatoRedis(long sequenceNumber, long offset, string onboardingid)
{
try
{
var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
var redis = redisConnection.GetDatabase();
var value = new
{
sequence_number = sequenceNumber.ToString(),
offset = offset.ToString(),
};
redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
}
catch (Exception)
{
throw;
}
}
}
public class EventHubRetriever
{
public static async Task GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
{
try
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync("0");
var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
var eventPosition = EventPosition.FromOffset(existentEvent["offset"].ToString());
var lastEnqueuedOffset = Convert.ToInt32(partitionRuntimeInformation.LastEnqueuedOffset);
var offset = existentEvent["offset"];
// var offsetPosition = lastEnqueuedOffset + offset;
// var receiver = EventHubClient.Create();
}
catch (System.Exception)
{
throw;
}
}
}
解决方案
好吧,我不确定我所做的是否是正确的方法,但它似乎正在奏效。当我要发布消息时,我从 GetPartitionRunTime 中获取 lastEnqueuedNumber,加 1 并将其作为属性添加到 eventData。由于我还将事件添加到 redis 缓存,因此我能够检索它的序列号。
public class EventHubPublisher
{
public static async Task SendMessage(string _connectionString, string _eventHubName, string _message, string onboardingid, string partition)
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var partitionRuntimeInformation = await client.GetPartitionRuntimeInformationAsync(partition);
var sequenceNumber = partitionRuntimeInformation.LastEnqueuedSequenceNumber + 1;
var data = new EventData(Encoding.UTF8.GetBytes(_message));
data.Properties.Add("Id", onboardingid);
data.Properties.Add("Message", _message);
data.Properties.Add("SequenceNumber", sequenceNumber);
await client.SendAsync(data);
addDatatoRedis(onboardingid, sequenceNumber, data.Body.Offset, _eventHubName);
}
private static void addDatatoRedis(string onboardingid, long sequenceNumber, int offset, string topic)
{
try
{
var redisConnection = ConnectionMultiplexer.Connect(ConnectionStringsSettings.Properties.Redis);
var redis = redisConnection.GetDatabase();
var value = new
{
offset = offset,
id = onboardingid,
sequenceNumber = sequenceNumber,
topic = topic
};
redis.StringSetAsync(onboardingid, JsonSerializer.Serialize(value));
}
catch (Exception)
{
throw;
}
}
然后,当从 EventHub 检索事件时,我能够从缓存的事件中获取 sequenceNumber,并通过其索引获取 eventthub 上的事件。
public class EventHubRetriever
{
public static async Task<EventData> GetEvent(string _connectionString, string _eventHubName, JObject existentEvent)
{
try
{
var client = EventHubClient.CreateFromConnectionString($"{_connectionString};entityPath={_eventHubName}");
var eventHubRunTimeInformation = await client.GetRuntimeInformationAsync();
var partitionsIds = eventHubRunTimeInformation.PartitionIds;
var sequenceNumber = (long)existentEvent["sequenceNumber"];
var retrievedEvent = GetEventByPosition(client, sequenceNumber, partitionsIds);
return retrievedEvent;
}
catch (Exception exc)
{
throw new EventRetrieveException(exc, "An error ocurred while retrieving data from Event Hub.");
}
}
private static EventData GetEventByPosition(EventHubClient client, long sequenceNumber, string[] partitionsIds, string eventHubName)
{
var eventPosition = EventPosition.FromSequenceNumber(sequenceNumber, true);
var partitionReceivers = partitionsIds
.Select(id => client.CreateReceiver(eventHubName, id, eventPosition));
var events = partitionReceivers.Select(receiver => receiver.ReceiveAsync(1)).SelectMany(x => x.Result).ToList();
return events.ElementAt(0);
}
}
推荐阅读
- javascript - 带有 jquery.validate 和 jquery-form(AJAX 和 JSON)的 PHP Formmailer
- node.js - 无服务器框架、谷歌云函数和 Firestore 事件触发器
- python - 我需要登录表单的哪些元素来执行此网络抓取任务?
- javascript - 使用逗号和数据表小数
- python - 确定值是否为字符串的最有效方法
- javascript - Cache.match(request) 在服务工作者中返回未定义的 JSONP 请求
- javascript - 使用脚本根据主面板设置左右面板宽度
- python - 如何在Python中打印货币符号和n位十进制数
- ubuntu - 工作人员连接的 Nginx 500 内部错误是不够的
- xml - 将 2 xml 文件与 xslt 合并