apache-kafka-streams - 如何为 Kafka 流创建的状态存储设置保留期
问题描述
我正在使用 Streams DSL 并进行有状态聚合(从一个主题读取数据,聚合并将数据写入另一个主题)。如何减少写入状态存储的数据的保留期?现在我的基础设施团队说数据在状态存储中保留了 5 年,我必须减少它。是否有一个特定的配置可以设置数据应该保留多长时间?
KTable<Windowed<String>, JSONObject> kTable = filteredKstream
.groupBy((key, value) -> getNewKey(value),
Grouped.with(Serdes.String(), new JSONObjectSerde()))
.windowedBy(windows).aggregate(() -> {
SampleData sampleData = new SampleData();
return new JSONObject(mapperUtils.writeValueAsString(sampleData, mapper));
} , (key, value, aggregate) -> {
return getAggregateValue(aggregate, value);
} , Materialized
.<String, JSONObject, WindowStore<Bytes, byte[]>> as(
"sample-store")
.withKeySerde(Serdes.String())
.withValueSerde(jsonSerde));
解决方案
您可以使用Materialized#withRetention()
设置窗口和会话存储的保留期限。
推荐阅读
- c# - 使用c#删除解析中的一行
- c# - 如何在 C# 中从 XML 中删除一个完整的节点
- c# - asp网站项目中多目标框架的问题
- mysql - Mysql Pivot 创建
- jquery - 如何将 DIV 组与页面上的其他 div 对齐
- android - 具有固定高度的 WebView
- java - How to get value from JSON Array Plss
- angular7 - agggrid tree - 如何在 rowData 更新后保持树状态(收缩级别)?
- c - 如何使用 execlp() 查找 pid?
- audiokit - Xcode 版本创建 AudioKit 导入问题