首页 > 解决方案 > Kafka Streams Transformer context.timestamp()没有返回正确的时间戳

问题描述

我的用例的一些背景知识:我必须将特定记录(由唯一类型标识)缓冲几秒钟,因为我需要来自随后跟随它的另一个事件的一些信息。第二个事件几乎每次都在 5 秒内发生。为此,我通过使用状态存储来实现缓冲,在该状态存储中我将第一条记录缓冲 5 秒。我将记录作为 (key, Pair<Value,T1>) 放入 statestore,其中 T1 是我处理此记录时的时间戳。如果我在 5 秒内没有得到后续记录,那么我只想转发这个等待记录。一旦我将这条记录放入 statestore。我运行了一个预定的标点符号,在该标点符号中我转发了在 5 秒超时后过期的记录。

为此,我使用挂钟时间戳提取器。我正在通过处理器 api 在自定义 Transformer 中完成所有这些工作。我尝试通过调用 context.timestamp() 来获取时间戳(在我的示例中为 T1)。但是,transform 方法中的这个 context.timestamp() 不会返回当前系统时间戳。这是 transform 方法中的一个这样的日志

调试 [19:51:33.815] - 使用 key=** value={**} 和 timestamp=1621392502699 记录。
日志中的此时间戳来自 context.timestamp(),其值为 2021 年 5 月 19 日,星期三 2:48:22.699 AM,这与日志消息中显示的当前系统不同,即 19:51:33.815

当调用标点符号时,我也在捕获日志

这是来自标点方法 Punctuating @ timestamp 1621453893556 的时间戳。这与当前系统时间相匹配,即 2021 年 5 月 19 日,星期三 7:51:33.556 PM

我对为什么从 transform 方法调用 context.timestamp() 时没有返回当前系统时间感到困惑,即使文档说此方法将返回 TimestampExtractor 从 ConsumerRecord 提取的时间戳

因为未返回的时间戳是正确的,所以记录在整个 5 秒内都没有被缓冲,并且在标点符号运行时发出

任何帮助表示赞赏

使用的版本是 Kafka-Streams-2.3.1

标签: javaapache-kafkaapache-kafka-streams

解决方案


推荐阅读