apache-kafka - Kafka 在重新平衡期间最后生成和提交的偏移量
问题描述
下面的代码片段使用 kafka 管理客户端来检索某个消费者组(即 CONSUMER_GROUP)的所有分区的最后提交和生成的偏移量:
Map<TopicPartition, OffsetAndMetadata> offsets = admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
String topic = e.getKey().topic();
int partition = e.getKey().partition();
long committedOffset = e.getValue().offset();
long latestOffset = latestOffsets.get(e.getKey()).offset();
System.out.println("Consumer group " + CONSUMER_GROUP + " has committed offset " + committedOffset + " to topic " + topic + " partition " + partition + ". The latest offset in the partition is " + latestOffset + " so consumer group is "+ (latestOffset - committedOffset) + " records behind");
}
您能否就以下几点提示我:
如果消费者组处于重新平衡状态(或另一个不稳定状态),那会(如何)影响返回的最后一个和提交的偏移量?[假设我手动提交偏移量并使用消费者重新平衡侦听器在重新平衡之前提交最新处理的偏移量]
鉴于上面的代码,当管理请求由 Kafka 代理提供服务时,是否有任何特定方法可以知道该组是否正在进行重新平衡(或通常处于不稳定状态)?
如果我有兴趣检索不属于任何消费者组的特定主题分区的最新偏移量,我该如何实现呢?因此,为什么最新的偏移量绑定到特定的消费者组,因为它们是由生产者发送的,而与任何消费者组无关?
谢谢你。
解决方案
推荐阅读
- html - 为什么@font-face 在我的样式表中失败?
- javascript - 用 html 视频对漂亮的 dnd 做出反应,而不是冒泡事件
- reactjs - Azure 应用服务静态网站 routes.json 不起作用
- flutter - 如何捕获 ERROR_REQUIRES_RECENT_LOGIN?
- ruby-on-rails - Rails 6 APi 从用户出生日期过滤用户年龄
- python - 子进程没有捕获标准输出
- python - Python Mixins:*args/**kwargs 与显式调用`__init__`
- swift - 如何从 TabBarController 传输数据
- python - 尝试使用 Flask SqlAlchemy event.listen 在我的表中更改自动增量起始值。但我遇到表不存在
- javascript - 解决循环组合程序问题的程序