apache-kafka - Kafka 的 __consumer_offsets 中的消息时间戳和 commit_timestamp
问题描述
我目前正在开发一个部分依赖于 Apache Kafka(2.2.0 版)的应用程序。我必须做的一件事是跟踪其他消费者提交其当前偏移量的内容(更重要的是何时)。据我所知,仅使用 Java 客户端,没有办法获取已提交偏移量的相关时间戳,因为AdminClient
' 的listConsumerGroupOffsets
方法最终会导致一个OffsetAndMetadata
不包含时间戳的对象。因此,我只是开始阅读来自该__consumer_offsets
主题的消息。如果有更好的方法来做到这一点,请告诉我。
现在,如果一个人直接读取消息__consumer_offsets
,那么一个人突然有两个时间戳。一个是附加到实际提交消息的时间戳,另一个是commit_timestamp
,它是消息内容的一部分。我的第一个想法是其中一个可能由代理设置,另一个可能由提交它的客户端设置(另外,如果您/config/topics/__consumer_offsets
在 ZooKeeper 中查看,它没有指定LogAppendTime
消息时间戳,因此可以假设它只使用默认值)。唉,手动移动系统时间的快速实验表明,两者实际上都是由代理设置的。更重要的是,他们并不总是同意(消息的时间戳有时会稍早于commit_timestamp
)。我试图深入研究 Kafka 代码以准确了解发生了什么,但它相当复杂,而且我对它还不够熟悉,无法快速掌握。所以这是我的问题:
- 为什么即使没有明确指定消息时间戳也会
__consumer_offsets
自动出现?LogAppendTime
只是用于发送提交消息的生产者将时间戳留空吗? - 为什么消息时间戳和消息中
commit_timestamp
包含的时间不一致?我似乎记得曾经在某处读过它曾经可以显式设置commit_timestamp
并因此手动控制已提交偏移量的保留。 - 更重要的是:有任何理由使用其中一个吗?例如,如果仍然可以
commit_timestamp
手动设置,则使用附加到消息的时间戳会更有意义。
我知道这是一个非常具体的问题,对大多数人来说可能并不重要。但直到现在,我总是能够通过使用 Google 并查看 Kafka 的源代码来了解后台发生的事情;然而,这个让我有点难过。因此,非常感谢任何见解。
解决方案
我认为后一个时间戳是到期时间。您可以尝试以下以确保吗?
通过在 中设置参数来设置可访问的内部"exclude.internal.topics=false"
主题consumer.config
。
bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--bootstrap-server localhost:9092 \
--topic __consumer_offsets
我可以看到结果如下:
[mygroup1,mytopic1,11]::[OffsetMetadata[55166421,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup1,mytopic1,13]::[OffsetMetadata[55037927,NO_METADATA],CommitTime 1502060076305,ExpirationTime 1502146476305]
[mygroup2,mytopic2,0]::[OffsetMetadata[126,NO_METADATA],CommitTime 1502060076343,ExpirationTime 1502146476343]
我没有在我的机器上安装您在问题中提到的特定版本,所以请检查一下。
推荐阅读
- batch-file - 为什么“移动”命令在我的批处理文件中不起作用?
- android - Android Kotlin 将文件写入“文件”->“文档”-如何获取 URI?
- c++ - 如何使用 cmake 添加不同的头文件夹,以便项目正确编译和运行
- r - 将列添加到大标题
- firebase - React-Native Firebase 推送通知 IOS 仅适用于我的帐户电话 Plz HELP ;(
- firefox - 如何在 Firefox 中使用 ffmpeg.wasm 而不获取 SharedArrayBuffer?
- python - 从 Google Colab 中的 Google Sheets 读取已发布的 CSV 是否有变化?
- python-3.x - 为什么我的函数在我想要返回某些东西时不需要我使用字符串?
- r - 如何保持与另一个数据集的一个列匹配的变量
- python - Cx_freeze:如何解决由多个同名 .dylib 文件引起的冲突