首页 > 解决方案 > 无法在 Kafka Stream App 的子拓扑之间共享状态存储中的数据

问题描述

注意:这是一个无效的问题。请忽略。

我有一个 KSA 监听两个主题(两个子拓扑),一个(子拓扑 A)写入状态存储,另一个(子拓扑 B)从状态存储读取。

...
  stream
      .mapValues(v -> new Version(v.getHeader().getOccurredAt().getSeconds(), v.getVersion().getValue()))
      .groupByKey()
      .aggregate(
        () -> new Version(0,0),
        (aggKey, newValue, aggValue) -> aggValue.getTimestamp() > newValue.getTimestamp() ? aggValue : newValue,
      Materialized.<String, AgentVersion, KeyValueStore<Bytes, byte[]>>as(conf.versionStoreName())
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.serdeFrom(
          new Version.Serializer(), new Version.Deserializer())));

 ReadOnlyKeyValueStore<String, Version> getVersionStore() {
  return app().store(conf.versionStoreName(), QueryableStoreTypes.keyValueStore());
}

但是,我发现B无法获取A写入的数据(A可以正确获取日期)。
我错过了什么吗?

标签: apache-kafkaapache-kafka-streamsrocksdb

解决方案


通过设计,子拓扑相互隔离。如果要从一个子拓扑授予对另一个子拓扑的商店的访问权限,则需要将两个子拓扑连接到一个子拓扑中。例如,通过将相应的存储添加到处理器/变压器。

参照。https://docs.confluent.io/current/streams/architecture.html#stream-partitions-and-tasks


推荐阅读