首页 > 解决方案 > 基于offset-id跨集群的kafka消费管理

问题描述

我有一些 apache Kafka 消费者代码正在运行,并获取用户详细信息和进程并相应地更新。

目前我正在使用 kafka 偏移量来跟踪我正确处理的记录。

每当我的消费者由于任何原因重新启动时(一个节点出现故障,其他节点获取数据,或消费者重新启动等),它首先根据处理的偏移量设置 Kafka 偏移量读取

consumer.seek(//get the offset from db);

并开始投票

consumer.poll()

现在的问题是由于一些不同区域的失败测试,​​相同的应用程序将在其他地方运行并开始处理新数据。

即数据库是全局同步的,但是不同区域的kafka集群之间没有同步,所以我得到了偏移量,它没有针对不同区域的相同主题进行排序。

因此,我最终在其他地区寻求不同的偏移量。

每当发生故障转移时,第一个集群中的数据不会被转移到第二个集群,这可以根据业务需求进行。

当前的问题是,当新记录到达第二个集群时,我不应该从第一个集群应用程序设置的偏移量开始,这可以通过将偏移量与 Kafka clusterID(name) 一起保存来管理,因此每当寻找偏移量时,我都可以与集群一起查询获得基于区域的偏移量。

有没有更好的方法来处理这种情况?

标签: apache-kafkakafka-consumer-api

解决方案


推荐阅读