c# - Azure EventHubs 抛出异常:端点的至少一个接收器是使用 '0' 的纪元创建的,因此不允许使用非纪元接收器
问题描述
介绍
大家好,我们目前正在开发一个使用 Azure EventHubs 和事件在服务之间发送数据的微服务平台。让我们将这些服务命名为:CustomerService、OrderService 和 MobileBFF。
CustomerService 主要发送更新(带有事件),然后由 OrderService 和 MobileBFF 存储,以便能够响应查询,而无需调用 CustomerService 获取此数据。
所有这 3 项服务 + 我们在 DEV 环境中的开发人员都使用相同的 ConsumerGroup 连接到这些事件中心。
我们目前仅使用 1 个分区,但计划稍后扩展到多个。(您可以看到我们的代码已经可以从多个分区中读取)
例外
不过,我们时不时会遇到一个异常(如果它启动,它通常会持续抛出这个错误一个小时左右)。不过,目前我们只在 DEV/TEST 环境中看到过这个错误。
例外:
Azure.Messaging.EventHubs.EventHubsException(ConsumerDisconnected): At least one receiver for the endpoint is created with epoch of '0', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected.
EventHub 的所有消费者都将他们的 SequenceNumber 存储在他们自己的数据库中。这允许我们让每个消费者单独消费事件,并将最后处理的序列号存储在它自己的 SQL 数据库中。当服务(重新)启动时,它从数据库加载 SequenceNumber,然后从这里开始请求事件,直到找不到更多事件。然后它会休眠 100 毫秒,然后重试。这是(有些简化的)代码:
var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
string[] allPartitions = null;
await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
{
allPartitions = await consumer.GetPartitionIdsAsync(stoppingToken);
}
var allTasks = new List<Task>();
foreach (var partitionId in allPartitions)
{
//This is required if you reuse variables inside a Task.Run();
var partitionIdInternal = partitionId;
allTasks.Add(Task.Run(async () =>
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
{
EventPosition startingPosition;
using (var testScope = _serviceProvider.CreateScope())
{
var messageProcessor = testScope.ServiceProvider.GetService<EventHubInboxManager<T, EH>>();
//Obtains starting position from the database or sets to "Earliest" or "Latest" based on configuration
startingPosition = await messageProcessor.GetStartingPosition(_inboxOptions.InboxIdentifier, partitionIdInternal);
}
while (!stoppingToken.IsCancellationRequested)
{
bool processedSomething = false;
await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync(partitionIdInternal, startingPosition, stoppingToken))
{
processedSomething = true;
startingPosition = await messageProcessor.Handle(partitionEvent);
}
if (processedSomething == false)
{
await Task.Delay(100, stoppingToken);
}
}
}
}
catch (Exception ex)
{
//Log error / delay / retry
}
}
}
}
异常在以下行引发:
await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
更多调查
上述代码在微服务中运行(在 Azure 中作为 AppServices 托管)
除此之外,我们还运行 1 个 Azure 函数,该函数也从 EventHub 读取事件。(可能使用相同的消费者组)。
根据此处的文档:https ://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups每个消费者组应该可以有 5 个消费者。似乎建议只有一个,但我们不清楚如果我们不遵循此指导会发生什么。
我们确实做了一些测试,手动生成了多个读取事件的服务实例,当有超过 5 个时,这会导致一个不同的错误,它清楚地表明每个消费者组每个分区只能有 5 个消费者(或类似的东西) .
此外,似乎(我们不是 100% 确定)当我们重写代码(如上)以便能够为每个分区生成一个线程时,这个问题就开始发生了。(即使我们在 EventHub 中只有 1 个分区)。编辑:我们进行了更多的日志挖掘,并且在合并代码以为每个分区生成一个线程之前还发现了一些异常。
解决方案
该异常表明有另一个消费者配置为使用相同的消费者组并声明对分区的独占访问。除非您OwnerLevel
在客户端选项中明确设置该属性,否则可能的候选者是至少有一个EventProcessorClient
正在运行。
要进行补救,您可以:
停止针对同一事件中心和使用者组组合运行的任何事件处理器,并确保没有其他使用者显式设置
OwnerLevel
.在专门的消费者组中运行这些消费者;这将允许它们与独家消费者和/或事件处理器共存。
为这些消费者显式设置
OwnerLevel
为 1 或更大;这将声明所有权并强制同一消费者组中的任何其他消费者断开连接。
(注意:根据其他消费者是什么,您可能需要在此处测试不同的值。事件处理器类型使用 0,因此任何高于该值的都将优先。)
推荐阅读
- ubuntu - Nginx 位置匹配以 favicon.ico 结尾的所有内容
- python - 尝试使用文本库创建一个连接 4 游戏,然后稍后实现 GUI
- python - Python将图像文件夹lat和lon获取到df
- node.js - 如何处理nodeJS / express中的promise错误
- javascript - 不能在回调中调用 React Hook “useState”。使用 useMediaQuery 响应式 JS 媒体查询
- spring - Spring MVC 项目结构
- android - 当我滑动时,我的应用程序正在调用所有 TabBarView 内容
- pandas - 对整个 panda 数据框而不是系列进行切片会导致数据类型发生变化,并将第一个字段的值分配给 NaN,发生了什么?
- r - dfSummary() 图形未打印在 HTML 文件中
- algorithm - 使用动态编程和位掩码赢得特殊单淘汰锦标赛的概率