apache-kafka - Kafka 消费者消费的消息数
问题描述
在我们的应用中有发布者和消费者,通过 swagger 我们可以请求发布者发送特定类型的数据,数据量可以在 10K 到 100K 之间。然后消费者收到相同的内容并进一步处理。
作为一种特定类型的数据集,我们需要知道确切的计数,发布了多少以及消耗了多少。早些时候我们使用了 MQ,所以我们知道 10K 已经发布,现在 MQ 是空的,所以所有的都被消耗了,但在 kafka 中我们无法弄清楚。
有什么办法可以找到这个,我知道我们可以开始计算代码并在某个地方打印/存储,但 Kafka 提供的任何东西?
解决方案
Kafka 不存储此消息计数信息,但它确实存储偏移量,您可以从消费者那里查询以查找它的开始位置、当前位置以及剩余的消息数量。然而,这始终是一个移动的目标。
在伪代码中,您基本上需要在消费者代码周围拥有外部存储/数据库
long count = 0; // or lookup previous value by client-id, topic-partition, etc from some storage
try {
while (consumerRunning) {
// poll ...
for each record {
process(record);
count++;
}
}
} catch (WakeupException e) { // and other exceptions
} finally {
updateCount(count); // save back to storage
}
例如,虽然您可以在 JMX 指标中聚合消费者字节,但这需要外部指标收集过程。
总的来说,在 Kafka 中通常不需要这样做,因为您总是可以重新计算消费者的偏移量(或根本不提交它们),并且消费者可以横向扩展,因此计数可能不一致。
推荐阅读
- node.js - 我应该如何在服务器和客户端设置标头?
- sql - 在 SQL Server 中,如何检查字段是否包含特定范围之外的字符?
- java - 如何将有关类的静态数据存储到
- java - 为什么这种二进制搜索实现会导致 Ruby 中的堆栈溢出而不是 Java?
- javascript - 如何编辑 Google App 脚本以根据输入保存文件名并将其连接到 Google 工作表
- android - 键盘可见时如何从一个编辑文本滚动到另一个编辑文本
- sql - 使用错误处理将整数转换为雪花中的日期
- c# - 类库中的 Blazor 本地化
- c# - 在 .NET Core 3.1 中使用 OIDC 刷新访问令牌,永远不会调用/抛出 OnValidatePrincipal
- powershell - 从一个巨大的 .TMP 文件创建一个小文件(.txt 或 .TMP)