首页 > 解决方案 > Kafka 的 __consumer_offsets 中的消息时间戳和 commit_timestamp

问题描述

我目前正在开发一个部分依赖于 Apache Kafka(2.2.0 版)的应用程序。我必须做的一件事是跟踪其他消费者提交其当前偏移量的内容(更重要的是何时)。据我所知,仅使用 Java 客户端,没有办法获取已提交偏移量的相关时间戳,因为AdminClient' 的listConsumerGroupOffsets方法最终会导致一个OffsetAndMetadata不包含时间戳的对象。因此,我只是开始阅读来自该__consumer_offsets主题的消息。如果有更好的方法来做到这一点,请告诉我。

现在,如果一个人直接读取消息__consumer_offsets,那么一个人突然有两个时间戳。一个是附加到实际提交消息的时间戳,另一个是commit_timestamp,它是消息内容的一部分。我的第一个想法是其中一个可能由代理设置,另一个可能由提交它的客户端设置(另外,如果您/config/topics/__consumer_offsets在 ZooKeeper 中查看,它没有指定LogAppendTime消息时间戳,因此可以假设它只使用默认值)。唉,手动移动系统时间的快速实验表明,两者实际上都是由代理设置的。更重要的是,他们并不总是同意(消息的时间戳有时会稍早于commit_timestamp)。我试图深入研究 Kafka 代码以准确了解发生了什么,但它相当复杂,而且我对它还不够熟悉,无法快速掌握。所以这是我的问题:

  1. 为什么即使没有明确指定消息时间戳也会__consumer_offsets自动出现?LogAppendTime只是用于发送提交消息的生产者将时间戳留空吗?
  2. 为什么消息时间戳和消息中commit_timestamp包含的时间不一致?我似乎记得曾经在某处读过它曾经可以显式设置commit_timestamp并因此手动控制已提交偏移量的保留。
  3. 更重要的是:有任何理由使用其中一个吗?例如,如果仍然可以commit_timestamp手动设置,则使用附加到消息的时间戳会更有意义。

我知道这是一个非常具体的问题,对大多数人来说可能并不重要。但直到现在,我总是能够通过使用 Google 并查看 Kafka 的源代码来了解后台发生的事情;然而,这个让我有点难过。因此,非常感谢任何见解。

标签: apache-kafkakafka-consumer-api

解决方案


我认为后一个时间戳是到期时间。您可以尝试以下以确保吗?

通过在 中设置参数来设置可访问的内部"exclude.internal.topics=false"主题consumer.config

bin/kafka-console-consumer.sh  --consumer.config /tmp/consumer.config \
     --bootstrap-server localhost:9092 \
     --topic __consumer_offsets

我可以看到结果如下:

[mygroup1,mytopic1,11]::[OffsetMetadata[55166421,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup1,mytopic1,13]::[OffsetMetadata[55037927,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup2,mytopic2,0]::[OffsetMetadata[126,NO_METADATA],CommitTime 1502060076343,ExpirationTime 1502146476343]

我没有在我的机器上安装您在问题中提到的特定版本,所以请检查一下。


推荐阅读