首页 > 解决方案 > 将数据存储到 GlobalKTable

问题描述

我无法理解如何将数据保存到 globalKTable。我尝试使用在 java 中创建 GlobalKTable

GlobalKTable globalTable = builder.globalTable(inputTopic, Materialized.>as("global-store"));

在此之后,我无法找到任何将数据直接存储到此 GlobalKTable 的示例。它在任何地方都与连接或流一起使用。

标签: javaapache-kafkaapache-kafka-streams

解决方案


由于每个 Stream 应用程序实例加载整个GlobalKTable数据,它默认禁用记录到changelog主题,并且GlobalKTable使用输入主题作为更改日志源进行恢复过程(以实现容错),因此更改全局存储并没有意义不能那样做。

您可以通过使用获得对处理器 API 中全局状态的只读访问权限,ProcessorContext.getStateStore("global-store")而无需将存储添加到Processor.

实现此目的的一种方法是将要进行的更新推送到 ,inputTopic以便它在所有应用程序实例中更新每个 GlobalKTable。


推荐阅读