apache-kafka-streams - KTable 状态存储持久化
问题描述
如果我在实现 KTable 时使用持久存储,状态存储是否会在应用程序重新启动时保持不变?例如,如果我使用以下内容:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"foo",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
状态存储“可查询存储名称”是否可以在重新启动时使用先前运行的状态访问?可以说,我向主题 foo 发送了 50 条记录,并在状态存储中实现了它。然后应用程序重新启动,我仍然在状态存储中保留这 50 条记录吗?如果没有,有没有办法做到这一点?
谢谢!
解决方案
是的,状态存储默认保存在磁盘上。当应用程序重新启动application-id
且未更改时,将从磁盘恢复状态,包含所有 50 条记录。当应用程序被杀死/停止/重新启动时,将从偏移量添加新记录。
编辑:似乎您缺少 KTable 之上的聚合操作,这是必需的。请参阅我的代码示例:
final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);
final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
.map((key,value)->{
ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
return new KeyValue<>(newKey,value);
})
.filter((key,value)->key!=null)
.groupByKey()
.count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);
推荐阅读
- c# - NavigationException:Blazor 中的“Exception_WasThrown”
- spring - 对象中的空属性由 mono.block 检索
- node.js - 如何将 SSL 后端与来自客户端 Web 浏览器的自定义证书一起使用
- terraform - 通过 Terraform 在 KVM 上自动部署机器
- google-apps-script - 应用程序脚本中的xpath?
- reactjs - 轻量级图表反应
- angular - 用于验证输入字段的正则表达式
- python - 如何在 xarray 中启用季节选择作为 JJAS 而不是 JJA
- r - 测试 r 调查包中两种方法之间的差异
- java - jooq TIMESTAMP(6) 支持