apache-kafka - Kafka Streams:是否可以使用不同的时间戳进行删除而不是流处理?
问题描述
在 Kafka Streams 2.0 中。
我的用例:能够(部分)重新处理具有事件创建时间的数据(用户从原始数据定义并通过 TimestampExtractor 设置)从重新处理应用程序的历史开始,与长期运行的不间断应用程序一起运行,将数据发送到输出主题(两个应用程序将读取并发送到相同的输出主题,用于构建状态)。
商店是根据这些主题构建的,并且包括按会话的窗口。想象一下,我想为这些主题提供一个月的保留期(用于无序事件和消费) - 在重新处理时,如果使用事件时间,我将处理(并生成)比-月事件。
message.timestamp.type=LogAppendTime
根据KIP-32使用以避免删除,将在状态存储中生成错误数据(因为时间戳将不正确,它们将用于例如会话)。
使用事件时间,保持完全保留,并在重新处理完成和使用后应用清除数据,虽然很乏味,但有助于减少主题的大小 - 但是,从它们构建的存储呢?例如,为了在重新处理发生时保存数据,我必须until
设置伪无穷大,但 DSL 创建的存储是(或应该是)只读的,而不是被操纵的。
那么,回到标题:
- 是否有可能(或设想)使用不同的时间戳进行删除而不是流处理?
- 还有其他方法可以更好地解决它吗?
解决方案
对于 Streams,LogAppendTime
用于重新分区主题是一种未命中配置。另请注意,您不会丢失重新分区主题中的任何数据,因为这些数据是使用保留时间创建的Integer.MAX_VALUE
(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-284%3A+Set+default +retention+ms+for+Streams+repartition+topics+to+Long.MAX_VALUE)。Streams 使用purgeData
API 在使用后从重新分区主题中删除数据(参见https://issues.apache.org/jira/browse/KAFKA-6150)以避免无限增长。
因此,我建议通过log.message.timestamp.type
(即主题级别配置)重新配置所有重新分区主题。
推荐阅读
- shopware - 在购物世界中包含 Shopware 5 默认联系表?
- machine-learning - Deploying 和 Serving ML 模型有什么区别?
- php - php 只有变量应该通过引用传递错误
- python - C++ 服务器和 django 作为客户端连接
- javascript - 如何捕捉 postman.setNextRequest 错误
- php - 如何使用 if isset $_POST 输出中的变量?
- r - 如何按字母和数字顺序排序刺痛?
- jasper-reports - Jasper 报告时区问题
- kotlin - Jetpack Compose BasicTextField 中的装饰框
- c# - 如何根据角度计算不同的速度平均值?