首页 > 解决方案 > Azure 事件中心 Java 客户端未处理数据

问题描述

EventProcessorHost 示例之后,我们在 onEvents() 中实现了我们的自定义逻辑。一些数据没有被处理,我怀疑这是因为 Java 客户端抛出的警告。

在日志中,我们看到 StorageException(用于更新租约或检查点的 Blob 存储超时)、LeaseLostException(可能是由于之前的异常)和 EventHubException(当事件中心移动或短时间脱机时)。

基本上我的问题是:这些异常如何影响事件的处理以及我们如何确保没有事件被跳过(例如,通过异常处理重试和完全关闭作为最后的手段)?

我通读了文档并搜索了其他无法找到满意答案的问题(这个这个提供了一些见解)。

我们的代码:

public class EventProcessor implements IEventProcessor {
    ...
    @Override
    public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception {
        for (EventData event : events) {
            try {
                String message = new String(event.getBytes(), StandardCharsets.UTF_8);

                mystuff.process(message);

                this.checkpointBatchingCount++;
                if ((checkpointBatchingCount % 50) == 0) {
                    context.checkpoint(data).get();
                }
            } catch (Exception e) {
                LOG.warn("Processing event failed: {}", e.getMessage())
            }
        }
    }
    ...
}

标签: javaazureazure-eventhub

解决方案


根据我对 EventProcessor 的理解,您将重新处理事件而不是丢失事件。可能还有另一个潜在的问题。

当您调用时checkpoint,它会保留序列号(偏移量等)流,以EventData表示“我已经处理了这个”。

当您获得 aStorageException时,这意味着序列号未成功持久化,因此旧事件的序列号将继续存在于您的 blob 存储中。如果您遇到EventHubException处理器在重新启动时断开连接的情况,它将尝试声明任何已过期的租约并从最后一个成功的检查点开始处理。

如果另一个事件处理器“窃取”了您当前正在处理的分区,LeaseLostException您将得到。当有多个 EventProcessor 实例正在运行并且客户端尝试平衡正在运行的实例之间的分区数量时,就会发生这种情况。


推荐阅读