java - Azure 事件中心 Java 客户端未处理数据
问题描述
在EventProcessorHost 示例之后,我们在 onEvents() 中实现了我们的自定义逻辑。一些数据没有被处理,我怀疑这是因为 Java 客户端抛出的警告。
在日志中,我们看到 StorageException(用于更新租约或检查点的 Blob 存储超时)、LeaseLostException(可能是由于之前的异常)和 EventHubException(当事件中心移动或短时间脱机时)。
基本上我的问题是:这些异常如何影响事件的处理以及我们如何确保没有事件被跳过(例如,通过异常处理重试和完全关闭作为最后的手段)?
我通读了文档并搜索了其他无法找到满意答案的问题(这个和这个提供了一些见解)。
我们的代码:
public class EventProcessor implements IEventProcessor {
...
@Override
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception {
for (EventData event : events) {
try {
String message = new String(event.getBytes(), StandardCharsets.UTF_8);
mystuff.process(message);
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 50) == 0) {
context.checkpoint(data).get();
}
} catch (Exception e) {
LOG.warn("Processing event failed: {}", e.getMessage())
}
}
}
...
}
解决方案
根据我对 EventProcessor 的理解,您将重新处理事件而不是丢失事件。可能还有另一个潜在的问题。
当您调用时checkpoint
,它会保留序列号(偏移量等)流,以EventData
表示“我已经处理了这个”。
当您获得 aStorageException
时,这意味着序列号未成功持久化,因此旧事件的序列号将继续存在于您的 blob 存储中。如果您遇到EventHubException
处理器在重新启动时断开连接的情况,它将尝试声明任何已过期的租约并从最后一个成功的检查点开始处理。
如果另一个事件处理器“窃取”了您当前正在处理的分区,LeaseLostException
您将得到。当有多个 EventProcessor 实例正在运行并且客户端尝试平衡正在运行的实例之间的分区数量时,就会发生这种情况。
推荐阅读
- javascript - 无法在 NetCore 2 上添加 ReactJS.NET
- netsuite - NetSuite - 保存的搜索 - 动态日期范围公式
- azure-resource-manager - 部署 ARM 模板时出现内部服务器错误
- cytoscape.js - 带有 Dagre 布局的 Cytoscape.js 可以绘制垂直树吗?
- java - Eclipselink 尝试在表中插入空值
- scala - 将 List[Any] 中的每个对象转换为特定类型?
- mysql - 如何在mysql中设置本地列表/元组变量
- regex - 如何将正则表达式应用于 MongoDB 字段
- c# - 将 MongoDB.Driver 从 2.7.0 升级到 2.7.1 后连接超时
- javascript - 如何处理 Ajax/POST/PHP