java - 获取spring kafka中未处理的消息计数
问题描述
我们正在迁移到 Kafka,我需要创建一个监控 POC 服务,该服务将定期检查 Kafka 队列中未处理的消息计数,并根据计数采取一些措施。但是此服务不得读取或处理消息,指定的消费者会这样做,对于每个 cron,此服务只需要队列中存在的未处理消息的计数。到目前为止,我已经从多个示例中做到了这一点
public void stats() throws ExecutionException, InterruptedException {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList(topicName));
while (true) {
Thread.sleep(1000);
ConsumerRecords<String, String> records = consumer.poll(1000);
if (!records.isEmpty()) {
System.out.println("records is not empty = " + records.count() + " " + records);
}
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
Set<TopicPartition> partitions = consumer.assignment();
//consumer.seekToBeginning(partitions);
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for (TopicPartition partition : offsets.keySet()) {
OffsetAndMetadata commitOffset = consumer.committed(new TopicPartition(partition.topic(), partition.partition()));
Long lag = commitOffset == null ? offsets.get(partition) : offsets.get(partition) - commitOffset.offset();
System.out.println("lag = " + lag);
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}
}
}
代码有时运行良好,有时输出错误,请告诉我
解决方案
不订阅主题;只需创建一个具有相同组的消费者即可获得 endOffsets。
有关示例,请参见此答案。
推荐阅读
- python - GEKKO 错误:在约束和目标内调用函数时出现“没有等式 (=) 或不等式 (>,<) 的方程”
- exception - Symfony 4 - 抛出错误 500 而不是 403
- amazon-web-services - 通过未签名的 POST 到 REST API 调用 AWS Lambda
- php - 我想使用 PHP 删除两个数组中的公共元素
- database - Rust - 我可以让这个柴油 dsl::find() 函数更通用吗?
- java - Java表问题与获取列的平均值
- css - 当下一个 flex 元素有长文本时,Flex 增长和 Flex 填充或收缩不起作用
- html - 具有最少列和行以适应屏幕的 CSS 网格
- asp.net-core - ASP.NET Core:生成控制器基础的 URL(不执行操作)
- c# - UseHttpsRedirection 不能可靠地工作?