java - Consumer.endOffsets 如何在 Kafka 中工作?
问题描述
假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交的偏移量和结束偏移量。与 Kafka 控制台消费者组脚本的工作方式类似,但它适用于所有组。
就像是
单一消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
多个消费者 - 工作
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
版本:Kafka-Client 2.0.0
我是否错误地使用了消费者 API?理想情况下,我想使用单一消费者。
如果您需要更多详细信息,请告诉我。
解决方案
我想你快到了。首先收集所有你感兴趣的主题分区,然后发出consumer.endOffsets
命令。
请记住,我没有尝试运行它,但是这样的事情应该可以工作:
run() {
Consumer consumer = createConsumer();
List<String> groupIds = getConsumerGroups();
List<TopicPartition> topicPartitions = new ArrayList<>();
for (String groupId: groupIds) {
topicPartitions.addAll(getTopicPartitions(groupId));
}
consumer.endOffsets(topicPartitions);
}
推荐阅读
- php - GET API 适用于网络浏览器和邮递员,但不适用于 php CURL
- ubuntu - 在 Ubuntu 21.04 上安装 QuantConnect LEAN 时出现分段错误
- python - 熊猫比较数据框值和更新列
- javascript - 有没有一种方法可以从一个图表 C3 js 中显示 3 个图表?
- typescript - vue-router 3.5.1 中的警告:
的 tag 属性已被弃用,并已在 Vue Router 4 中删除 - authentication - 在 MSAL 中获取不同的访问令牌 get token from device code flow
- angular - 如何在 igx-grid 中的行编辑期间以编程方式更新值?
- node.js - Reddit 喜欢评论部分 - MongoDB
- r - 将 ggforest 与 Coxme 主题一起使用
- php - magento 的 php 的 intl 和 zip 包的问题