apache-kafka - 如何从没有键的主题中加载 GlobalKTable?
问题描述
当我需要将没有键的主题加载为 akTable
时,我通常将主题作为 a 使用kStream
,重新映射它以提取键/值,并使用 akGroupedStream
来构建 thekTable
及其存储:
KStream<String, String> mappingStream = builder
.stream(TOPIC_MAPPING_IN, consumed)
.map(
(key, value) -> KeyValue.pair(
value.get("my_key").asText(),
value.get("my_value").asText())
);
KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(Serialized.with(Serdes.String(),Serdes.String() ));
KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
() -> "", //initializer
(aggKey, newValue, aggValue) -> newValue,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(MY_MAPPINGS_STORE)
.withValueSerde(Serdes.String())
.withKeySerde(Serdes.String())
);
现在,我们开始划分主题,我GlobalKTable
尝试GlobalKTable
利用builder.globalTable(...)
.
我想如果我们编写一个从 加载数据topic_in_WITHOUT_key
、伪造密钥并发布到topic_in_WITH_key
. 并且GlobalKTable
是从这个加载的topic_in_WITH_key
。
这是唯一的方法吗?
我对这个解决方案的主要关注是让某人创建/配置中间主题,可能会出现错误。在我介绍的情况下kTable
,changelog
主题充当中间主题,但这changelog-topic
是由应用程序自动创建的,具有适当的配置。
解决方案
推荐阅读
- swift - 在 swift 4 中迭代 3 度嵌套字典
- firebase - ReferenceError:使用 onAuthStateChanged 时未定义 firebase 错误
- sql - 如果我的 where 条件使用 Hive 给出空输出,如何显示表中的所有记录
- xml - 如何解决 Strings.xml 中高棉字符集的特定无效字符?
- git - 在 Windows 上删除 Git 的缓存密码
- javascript - Angularjs - 在 ng-repeat 中 Md-select 中断
- javascript - Ant design - 有没有办法获得我实际使用的样式?
- vue.js - 使日期时间列在 Vue 表 2 中可排序
- java - 在链表中搜索对象变量
- r - 神经网络输出不一致