首页 > 解决方案 > 获取spring kafka中未处理的消息计数

问题描述

我们正在迁移到 Kafka,我需要创建一个监控 POC 服务,该服务将定期检查 Kafka 队列中未处理的消息计数,并根据计数采取一些措施。但是此服务不得读取或处理消息,指定的消费者会这样做,对于每个 cron,此服务只需要队列中存在的未处理消息的计数。到目前为止,我已经从多个示例中做到了这一点

 public void stats() throws ExecutionException, InterruptedException {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kafka cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topicName));
        while (true) {
            Thread.sleep(1000);
            ConsumerRecords<String, String> records = consumer.poll(1000); 
            if (!records.isEmpty()) {
                System.out.println("records is not empty = " + records.count() + " " + records);
            }
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                Set<TopicPartition> partitions = consumer.assignment();
                //consumer.seekToBeginning(partitions);
                Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
                for (TopicPartition partition : offsets.keySet()) {
                    OffsetAndMetadata commitOffset = consumer.committed(new TopicPartition(partition.topic(), partition.partition()));
                    Long lag = commitOffset == null ? offsets.get(partition) : offsets.get(partition) - commitOffset.offset();
                    System.out.println("lag = " + lag);
                    System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
                }
            }
        }
    }
}

代码有时运行良好,有时输出错误,请告诉我

标签: javaapache-kafkaspring-kafka

解决方案


不订阅主题;只需创建一个具有相同组的消费者即可获得 endOffsets。

有关示例,请参见此答案。


推荐阅读