首页 > 解决方案 > Kafka Streams Rocksdb 保留没有使用窗口函数删除旧数据

问题描述

我正在运行具有窗口功能的 Kafka 流应用程序。但运行 24 小时后,本地磁盘使用量从 5G 增加到 20G 并不断增加。根据我的谷歌搜索,一旦我介绍了windowedBy它,它应该会自动删除旧数据。

我的拓扑如下所示:

stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()

我无法理解的一件事是,从这个拓扑结构中,它将创建两个内部重新分区主题,repartition-03以及repartition-14两个groupBy动作。从磁盘来看,所有正在repartition-03执行任务的机器都具有很高的磁盘使用率,并且似乎永远不会删除旧数据,而正在运行repartition-14任务的机器总是处于低磁盘使用率状态。

当我登录机器时,我发现这两台机器的路径不同,如下所示:

/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014
/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000

为什么他们有不同的路径?2_40用于repartition-14任务,它rocksdb在路径中,而另一个不包含rocksdb. 同时,taks1_4保留了几个类似KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000但后缀不同的文件夹。

我虽然一旦我引入了 windowedBy 函数,rocksdb 会在窗口过期时删除旧数据?为什么上述两个内部重新分区主题具有不同的路径和保留行为?

非常感谢任何帮助!谢谢!

标签: apache-kafka-streamsrocksdb

解决方案


默认保留期为 24 小时。你可以通过减少它

.reduce(..., Materialized.with(...).withRetention(...));

推荐阅读