apache-kafka-streams - 如何使用处理器 API 访问 DSL 创建的 KTable/GlobalKTable?
问题描述
我正在使用处理器 API (PAPI) 拓扑。
是否可以从处理器 API 中访问使用 DSL 创建的 KTable(或 GlobalKTable)(即使是只读的)?
即使用:
val builder = new StreamsBuilder()
val KTable = builder.table("topicname")
我得到了一个 KTable,但拓扑只允许您将addStateStore与 StoreBuilder 一起使用,而不是 KTable 本身。
.addStateStore(myStoreBuilder, MY_PROCESSOR_NAME)
所以我可以这样做:
def keyValueStoreBuilder[K, V](storeName: String, keySerde: Serde[K], valueSerde: Serde[V]): StoreBuilder[KeyValueStore[K, V]] = {
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
keySerde,
valueSerde)
}
storeName
但是,在这种情况下如何干净地获得呢?
解决方案
当您创建一个KTable
时,它会自动在内部创建一个商店,并使用生成的名称。(您可以通过 获取名称Topology#describe()
)。您还可以通过table()
使用Materialized
参数的方法为商店指定名称。
我有点不清楚,您所说的“在处理器 API 中访问 KTable”是什么意思?如果您的意思是“在 a 中访问 KTable 存储区Processor
”,您可以使用Topology#connectProcessorAndStateStores()
来授予处理器访问存储区的权限。请注意,处理器不应该写入 KTable 存储,因为table()
操作员负责维护表的状态。如果您确实写入存储,则无法保证,并且在发生故障时您可能会丢失数据。
推荐阅读
- javascript - 如何使用 WebSpeech API 将长句从语音转换为文本?
- javascript - 如何测试 JavaScript 内存使用情况
- powershell - 如何使用powershell从outlook下载文件
- go - 通过通道处理 HTTP 请求的模式
- json - 在 Golang 中读取请求正文两次
- android - 每次都显示共享服务选项
- excel - 如何引用保存在 Excel 2016/2019 中个人宏文件夹之外的 .XLAM 插件中的 Excel 宏(不是函数)?
- python - python 的 multiprocessing.Manager 是否会将我的应用程序暴露给安全漏洞?
- c# - 在 WPF 中使用 Post 从 C# 自动登录
- android - 安卓科特林。使用片段将数据绑定到 recyclerview 的问题