首页 > 解决方案 > Kafka DSL KStream 转换器之间的 Kafka 共享状态存储

问题描述

我有一个拓扑,我使用 Transformer 来聚合我的对象,稍后在我的拓扑中,我试图从第一个 Transformer 中使用的 State Store 中读取。似乎无法访问数据。是因为状态存储在不同的分区上吗?

我的拓扑看起来像这样:

streamsBuilder.stream("input")
    .transform(new TransformerSupplier1(), "my-store")
    .leftJoin(someKTable, myValueJoiner())
    .flatTransform(new TransformerSupplier2(), "my-store")

在我TransformerSupplier1Transformer,我的国营商店是类型<String, Map<String, Object>>

在我TransformerSupplier2的 'sTransformer中,我试图通过我用来存储在第一个变压器中的密钥来获取状态存储,但我得到了 null,当我这样做时.all().peekNextKey(),什么也找不到。

如果我需要为我的变形金刚提供更多信息,请告诉我,我会尝试混淆真正的逻辑。谢谢

标签: apache-kafkaapache-kafka-streams

解决方案


推荐阅读