首页 > 解决方案 > 如何使用处理器 API 访问 DSL 创建的 KTable/GlobalKTable?

问题描述

我正在使用处理器 API (PAPI) 拓扑。

是否可以从处理器 API 中访问使用 DSL 创建的 KTable(或 GlobalKTable)(即使是只读的)?

即使用:

val builder = new StreamsBuilder()
val KTable = builder.table("topicname")

我得到了一个 KTable,但拓扑只允许您将addStateStore与 StoreBuilder 一起使用,而不是 KTable 本身。

.addStateStore(myStoreBuilder, MY_PROCESSOR_NAME)

所以我可以这样做:

def keyValueStoreBuilder[K, V](storeName: String, keySerde: Serde[K], valueSerde: Serde[V]): StoreBuilder[KeyValueStore[K, V]] = {
Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore(storeName),
  keySerde,
  valueSerde)

}

storeName但是,在这种情况下如何干净地获得呢?

标签: apache-kafka-streams

解决方案


当您创建一个KTable时,它会自动在内部创建一个商店,并使用生成的名称。(您可以通过 获取名称Topology#describe())。您还可以通过table()使用Materialized参数的方法为商店指定名称。

我有点不清楚,您所说的“在处理器 API 中访问 KTable”是什么意思?如果您的意思是“在 a 中访问 KTable 存储区Processor”,您可以使用Topology#connectProcessorAndStateStores()来授予处理器访问存储区的权限。请注意,处理器不应该写入 KTable 存储,因为table()操作员负责维护表的状态。如果您确实写入存储,则无法保证,并且在发生故障时您可能会丢失数据。


推荐阅读