首页 > 解决方案 > 如何从没有键的主题中加载 GlobalKTable?

问题描述

当我需要将没有键的主题加载为 akTable时,我通常将主题作为 a 使用kStream,重新映射它以提取键/值,并使用 akGroupedStream来构建 thekTable及其存储:

KStream<String, String> mappingStream = builder
      .stream(TOPIC_MAPPING_IN, consumed)
      .map(
          (key, value) -> KeyValue.pair(
              value.get("my_key").asText(), 
              value.get("my_value").asText())
       );

KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(Serialized.with(Serdes.String(),Serdes.String() ));

KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
      () -> "", //initializer 
      (aggKey, newValue, aggValue) -> newValue,  
      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(MY_MAPPINGS_STORE)
          .withValueSerde(Serdes.String())
          .withKeySerde(Serdes.String())
       );

现在,我们开始划分主题,我GlobalKTable尝试GlobalKTable利用builder.globalTable(...).

我想如果我们编写一个从 加载数据topic_in_WITHOUT_key、伪造密钥并发布到topic_in_WITH_key. 并且GlobalKTable是从这个加载的topic_in_WITH_key

这是唯一的方法吗?

我对这个解决方案的主要关注是让某人创建/配置中间主题,可能会出现错误。在我介绍的情况下kTablechangelog主题充当中间主题,但这changelog-topic是由应用程序自动创建的,具有适当的配置。

标签: apache-kafkaapache-kafka-streams

解决方案


推荐阅读