首页 > 解决方案 > 如何为 Kafka 流创建的状态存储设置保留期

问题描述

我正在使用 Streams DSL 并进行有状态聚合(从一个主题读取数据,聚合并将数据写入另一个主题)。如何减少写入状态存储的数据的保留期?现在我的基础设施团队说数据在状态存储中保留了 5 年,我必须减少它。是否有一个特定的配置可以设置数据应该保留多长时间?

    KTable<Windowed<String>, JSONObject> kTable = filteredKstream
            .groupBy((key, value) -> getNewKey(value),
                    Grouped.with(Serdes.String(), new JSONObjectSerde()))
            .windowedBy(windows).aggregate(() -> {
                SampleData sampleData = new SampleData();
                return new JSONObject(mapperUtils.writeValueAsString(sampleData, mapper));
            } , (key, value, aggregate) -> {
                return getAggregateValue(aggregate, value);
            } , Materialized
                    .<String, JSONObject, WindowStore<Bytes, byte[]>> as(
                            "sample-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(jsonSerde));

标签: apache-kafka-streams

解决方案


您可以使用Materialized#withRetention()设置窗口和会话存储的保留期限。

https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/Materialized.html#withRetention-java.time.Duration-


推荐阅读