java - 将数据存储到 GlobalKTable
问题描述
我无法理解如何将数据保存到 globalKTable。我尝试使用在 java 中创建 GlobalKTable
GlobalKTable globalTable = builder.globalTable(inputTopic, Materialized.>as("global-store"));
在此之后,我无法找到任何将数据直接存储到此 GlobalKTable 的示例。它在任何地方都与连接或流一起使用。
解决方案
由于每个 Stream 应用程序实例加载整个GlobalKTable
数据,它默认禁用记录到changelog
主题,并且GlobalKTable
使用输入主题作为更改日志源进行恢复过程(以实现容错),因此更改全局存储并没有意义不能那样做。
您可以通过使用获得对处理器 API 中全局状态的只读访问权限,ProcessorContext.getStateStore("global-store")
而无需将存储添加到Processor
.
实现此目的的一种方法是将要进行的更新推送到 ,inputTopic
以便它在所有应用程序实例中更新每个 GlobalKTable。
推荐阅读
- c++ - 没有玩家 2 采取行动,基本井字游戏不会获胜
- c# - HTTP 错误 403.14 - 禁止 - ASP.NET Core 3.1 MVC
- java - 在Java中用固定数量的变量初始化随机矩阵
- node.js - npm run prod 错误:SyntaxError: Unexpected token = at new Script (vm.js:83:7)
- python - (tkinter) ConnectionRefusedError: [WinError 10061] 由于目标机器主动拒绝,无法建立连接
- python - 使用一维数组从 Keras 获得预测
- c++ - C ++:预期';' 在成员声明结束时
- java - 将属性文件工件附加到 BOM 工件
- node.js - 为什么一个月的存储日期与创建/记录的日期不同?
- c# - 我在从 Base64 字符串转换中找不到错误