spring-boot - 在 Kafka 流处理中聚合时使用提供的主题进行更改日志和重新分区
问题描述
我正在使用 Kafka 流处理来使用 Springboot 聚合来自源对象的数据。
@Bean
public java.util.function.Consumer<KStream<String, SourceObject>> processSourceObject() {
Serde<SourceObject> SourceObjectSerde = new JsonSerde<>(SourceObject.class);
Serde<AgrregatedObject> AgrregatedObjectSerde = new JsonSerde<>(AgrregatedObject.class);
return input -> input.map((key, value) -> new KeyValue<String, SourceObject>(value.uniques(), value))
.groupByKey(Grouped.with(Serdes.String(), SourceObjectSerde))
.aggregate(AgrregatedObject::new, (uniques, sourceObject,
destinationList) -> new SourceObjectUpdater().apply(sourceObject, destinationList),
Materialized.<String, AgrregatedObject>as(Stores.inMemoryKeyValueStore("custome-snapshots")).withKeySerde(Serdes.String()).withValueSerde(AgrregatedObjectSerde))
.toStream().foreach((foo, bar) -> process);
}
在运行此应用程序时,连同提供给processSourceObject的主题一起,它会自动创建另外两个主题
- processSourceObject-applicationId-data-snapshots-changelog
- processSourceObject-applicationId-data-snapshots-repartition
由于某些原因,我想使用现有主题而不是使用这两个主题。我在哪里进行更改以提供预定义主题的名称,以供我的应用程序用于更改日志和重新分区数据?
解决方案
这取决于您使用的版本。从 Apache Kafka 2.4 开始,Streams API 允许命名所有操作员/处理器,这些名称用于重新分区和更改日志主题。
但是,所有内部主题总是以or为前缀<application.id>-
和后缀,因此您只能设置部分主题名称。-repartition
-changelog
例如,您可以使用Grouped.as("myName")
为重新分区主题设置名称。
推荐阅读
- c# - 使用我的 WPF GUI 冻结的 Powershell 脚本无法找出原因
- python - AWS Canary Selenium 用户代理字符串
- python - 如何打印一个数组中相对于另一个数组中的值的值
- android - 无法安装 SDK 工具,完成按钮被禁用
- git - 如何修复 git 错误:RPC 失败;HTTP 520 卷曲 22?
- powershell - 导出 CSV 文件的格式与实际命令不同
- discord.py - 当我们不是服务器的所有者时,我们如何添加机器人?
- css - CSS卡片按钮:悬停为整张卡片添加填充/边距(我认为)
- google-sheets - 希望为我的 dnd 组添加一个按钮(谷歌表格)
- css - 从 npm 花费太多时间来安装 Chakra UI