apache-kafka - 无法在 Kafka Stream App 的子拓扑之间共享状态存储中的数据
问题描述
注意:这是一个无效的问题。请忽略。
我有一个 KSA 监听两个主题(两个子拓扑),一个(子拓扑 A)写入状态存储,另一个(子拓扑 B)从状态存储读取。
写
...
stream
.mapValues(v -> new Version(v.getHeader().getOccurredAt().getSeconds(), v.getVersion().getValue()))
.groupByKey()
.aggregate(
() -> new Version(0,0),
(aggKey, newValue, aggValue) -> aggValue.getTimestamp() > newValue.getTimestamp() ? aggValue : newValue,
Materialized.<String, AgentVersion, KeyValueStore<Bytes, byte[]>>as(conf.versionStoreName())
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(
new Version.Serializer(), new Version.Deserializer())));
读
ReadOnlyKeyValueStore<String, Version> getVersionStore() {
return app().store(conf.versionStoreName(), QueryableStoreTypes.keyValueStore());
}
但是,我发现B无法获取A写入的数据(A可以正确获取日期)。
我错过了什么吗?
解决方案
通过设计,子拓扑相互隔离。如果要从一个子拓扑授予对另一个子拓扑的商店的访问权限,则需要将两个子拓扑连接到一个子拓扑中。例如,通过将相应的存储添加到处理器/变压器。
参照。https://docs.confluent.io/current/streams/architecture.html#stream-partitions-and-tasks
推荐阅读
- python - python多处理读写锁
- java - Java 对 AWS 进行 HTTP 调用
- javascript - 如何在父组件html表格中显示子组件自定义显示元素
- c# - 为什么 Visual Studio 中的“将 JSON 粘贴为类”和“将 XML 粘贴为类”命令被禁用(灰显)?
- python - 编译 Cython 扩展而不调用 python setup.py 或 setuptools
- javascript - 语义 UI - 选项卡/显示 URL
- ruby - 如何避免在 Ruby 中将日期对象存储在数组中?
- python - 为什么这个递归函数的结果是 16?
- pandas - 熊猫:将数组列分解为一系列二进制列?
- java - 如何在 Spring Boot 应用程序中自动装配其他类中的 bean?