apache-kafka - 基于offset-id跨集群的kafka消费管理
问题描述
我有一些 apache Kafka 消费者代码正在运行,并获取用户详细信息和进程并相应地更新。
目前我正在使用 kafka 偏移量来跟踪我正确处理的记录。
每当我的消费者由于任何原因重新启动时(一个节点出现故障,其他节点获取数据,或消费者重新启动等),它首先根据处理的偏移量设置 Kafka 偏移量读取
consumer.seek(//get the offset from db);
并开始投票
consumer.poll()
现在的问题是由于一些不同区域的失败测试,相同的应用程序将在其他地方运行并开始处理新数据。
即数据库是全局同步的,但是不同区域的kafka集群之间没有同步,所以我得到了偏移量,它没有针对不同区域的相同主题进行排序。
因此,我最终在其他地区寻求不同的偏移量。
每当发生故障转移时,第一个集群中的数据不会被转移到第二个集群,这可以根据业务需求进行。
当前的问题是,当新记录到达第二个集群时,我不应该从第一个集群应用程序设置的偏移量开始,这可以通过将偏移量与 Kafka clusterID(name) 一起保存来管理,因此每当寻找偏移量时,我都可以与集群一起查询获得基于区域的偏移量。
有没有更好的方法来处理这种情况?
解决方案
推荐阅读
- c# - 公共函数返回一个列表c#
- html - 造型类星体元素按钮工具
- gitlab - run a gitlab-ci stage only if changes or previous fail
- postgresql - 不同的索引用于具有不同 companyid 的相同查询
- android - Firebase firestore database returns null, the Query Snapshot value is empty but I have data stored on firestore, why is this returning null?
- web3js - how can I use encodeFunctionCall()?
- javascript - Sort JSON string by attribute in JavaScript
- django - Filter query on first many-to-many item
- pycharm - Pycharm 无法读取某些代码。我怎么解决这个问题?
- c# - 如何将原始类型映射到实现接口的对象的属性