首页 > 解决方案 > KTable 状态存储持久化

问题描述

如果我在实现 KTable 时使用持久存储,状态存储是否会在应用程序重新启动时保持不变?例如,如果我使用以下内容:

StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier =      Stores.persistentKeyValueStore("queryable-store-name");
 KTable<Long,String> table = builder.table(
   "foo",
   Materialized.as(storeSupplier)
               .withKeySerde(Serdes.Long())
               .withValueSerde(Serdes.String())

状态存储“可查询存储名称”是否可以在重新启动时使用先前运行的状态访问?可以说,我向主题 foo 发送了 50 条记录,并在状态存储中实现了它。然后应用程序重新启动,我仍然在状态存储中保留这 50 条记录吗?如果没有,有没有办法做到这一点?

谢谢!

标签: apache-kafka-streams

解决方案


是的,状态存储默认保存在磁盘上。当应用程序重新启动application-id且未更改时,将从磁盘恢复状态,包含所有 50 条记录。当应用程序被杀死/停止/重新启动时,将从偏移量添加新记录。

编辑:似乎您缺少 KTable 之上的聚合操作,这是必需的。请参阅我的代码示例:

final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);

final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
        .map((key,value)->{
            ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
            return new KeyValue<>(newKey,value);
        })
        .filter((key,value)->key!=null)
        .groupByKey()
        .count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);

推荐阅读