首页 > 解决方案 > Kafka Streams:是否可以使用不同的时间戳进行删除而不是流处理?

问题描述

在 Kafka Streams 2.0 中。

我的用例:能够(部分)重新处理具有事件创建时间的数据(用户从原始数据定义并通过 TimestampExtractor 设置)从重新处理应用程序的历史开始,与长期运行的不间断应用程序一起运行,将数据发送到输出主题(两个应用程序将读取并发送到相同的输出主题,用于构建状态)。

商店是根据这些主题构建的,并且包括按会话的窗口。想象一下,我想为这些主题提供一个月的保留期(用于无序事件和消费) - 在重新处理时,如果使用事件时间,我将处理(并生成)比-月事件。

message.timestamp.type=LogAppendTime根据KIP-32使用以避免删除,将在状态存储中生成错误数据(因为时间戳将不正确,它们将用于例如会话)。

使用事件时间,保持完全保留,并在重新处理完成和使用后应用清除数据,虽然很乏味,但有助于减少主题的大小 - 但是,从它们构建的存储呢?例如,为了在重新处理发生时保存数据,我必须until设置伪无穷大,但 DSL 创建的存储是(或应该是)只读的,而不是被操纵的。

那么,回到标题:

标签: apache-kafkaapache-kafka-streams

解决方案


对于 Streams,LogAppendTime用于重新分区主题是一种未命中配置。另请注意,您不会丢失重新分区主题中的任何数据,因为这些数据是使用保留时间创建的Integer.MAX_VALUE(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-284%3A+Set+default +retention+ms+for+Streams+repartition+topics+to+Long.MAX_VALUE)。Streams 使用purgeDataAPI 在使用后从重新分区主题中删除数据(参见https://issues.apache.org/jira/browse/KAFKA-6150)以避免无限增长。

因此,我建议通过log.message.timestamp.type(即主题级别配置)重新配置所有重新分区主题。


推荐阅读