首页 > 解决方案 > Kafka 消费者消费的消息数

问题描述

在我们的应用中有发布者和消费者,通过 swagger 我们可以请求发布者发送特定类型的数据,数据量可以在 10K 到 100K 之间。然后消费者收到相同的内容并进一步处理。

作为一种特定类型的数据集,我们需要知道确切的计数,发布了多少以及消耗了多少。早些时候我们使用了 MQ,所以我们知道 10K 已经发布,现在 MQ 是空的,所以所有的都被消耗了,但在 kafka 中我们无法弄清楚。

有什么办法可以找到这个,我知道我们可以开始计算代码并在某个地方打印/存储,但 Kafka 提供的任何东西?

标签: apache-kafka

解决方案


Kafka 不存储此消息计数信息,但它确实存储偏移量,您可以从消费者那里查询以查找它的开始位置、当前位置以及剩余的消息数量。然而,这始终是一个移动的目标。

在伪代码中,您基本上需要在消费者代码周围拥有外部存储/数据库

long count = 0;  // or lookup previous value by client-id, topic-partition, etc from some storage
try {
  while (consumerRunning) {
    // poll ...
    for each record {
        process(record);
        count++;
    }
  }
} catch (WakeupException e) { // and other exceptions

} finally {
  updateCount(count); // save back to storage
}

例如,虽然您可以在 JMX 指标中聚合消费者字节,但这需要外部指标收集过程。

总的来说,在 Kafka 中通常不需要这样做,因为您总是可以重新计算消费者的偏移量(或根本不提交它们),并且消费者可以横向扩展,因此计数可能不一致。


推荐阅读