apache-kafka - 卡夫卡流时间戳提取器
问题描述
大家好,我有一个关于 TimestampExtractor 和 Kafka Streams 的问题......
在我们的应用程序中,可能会接收到乱序事件,因此我喜欢根据有效负载内的业务日期而不是它们放置在主题中的时间点来对事件进行排序。
为此,我编写了一个自定义 TimestampExtractor,以便能够从有效负载中提取时间戳。直到我在这里告诉的一切都运行良好,但是当我为这个主题构建 KTable 时,我发现我收到的事件发生了故障(从业务的角度来看,它不是最后一个事件,而是最后收到的)显示为对象的最后状态,而 ConsumerRecord 具有来自有效负载的时间戳。
我不知道假设 Kafka Stream 将使用 TimestampExtractor 解决这个乱序问题可能是我的错误。
然后在调试过程中我看到如果 TimestampExtractor 返回 -1 作为结果 Kafka Streams 忽略消息并且 TimestampExtractor 还提供最后接受的事件的时间戳,所以我构建了一个实现以下检查的逻辑(payloadTimestamp < previousTimestamp)返回 -1 ,这实现了我想要的逻辑,但我不确定我是否在危险水域航行。
我是否允许处理这样的逻辑或存在其他方法来处理 Kafka 流中的乱序事件......
谢谢回答..
解决方案
目前(Kafka 2.0),KTable
更新时不考虑时间戳,因为假设输入主题中没有乱序数据。这种假设的原因是“单写者原则”——假设对于压缩的 KTable 输入主题,每个键只有一个生产者,因此,不会有任何乱序数据关于单键。
这是一个已知问题:https ://issues.apache.org/jira/browse/KAFKA-6521
为了您的修复:执行此“hack”不是 100% 正确或安全的:
- 首先,假设您有两条不同的消息,带有两个不同的 key
<key1, value1, 5>, <key2, value2, 3>
。与时间戳为 5 的第一条记录相比,时间戳为 3 的第二条记录晚。但是,两者都有不同的键,因此,您实际上希望将第二条记录放入 KTable 中。只有当您有两个具有相同键的记录时,您才希望删除迟到的数据 IHMO。 - 其次,如果您有两条记录具有相同的键,而第二条记录乱序,并且您在处理第二条记录之前崩溃,则会
TimestampExtractor
丢失第一条记录的时间戳。因此,在重新启动时,它不会丢弃乱序记录。
要做到这一点,您将需要在应用程序逻辑中“手动”过滤,而不是无状态和与键无关的过滤TimestampExtractor
。builder#table()
您可以将其作为流读取,而不是通过读取数据,然后应用一个.groupByKey().reduce()
来构建KTable
. 在您Reducer
的逻辑中,您比较新旧记录的时间戳并返回具有较大时间戳的记录。
推荐阅读
- java - 如何更改相机在 LibGdx 中的位置?
- scala - 如何从 scala 列表中制作嵌套地图
- reactjs - ReactJS 主题建议
- python - 如何在循环中将列表项添加到我的 URL 结构中?
- r - 将日期向量强制转换为 R 中的字符串?
- python - 选择数据框的日期并循环日期索引
- postgresql - 修复重复行以遵守约束
- java - 自动装配 bean 上的空指针,不被 mockito 模拟
- r - 警告:loadNamespace 中的错误:没有名为“leafletsector”的包
- javascript - 同一个 iframe 中的 Typeform 和 Calendly 重定向