首页 > 解决方案 > 在 Kafka 流处理中聚合时使用提供的主题进行更改日志和重新分区

问题描述

我正在使用 Kafka 流处理来使用 Springboot 聚合来自源对象的数据。

@Bean
public java.util.function.Consumer<KStream<String, SourceObject>> processSourceObject() {
    Serde<SourceObject> SourceObjectSerde = new JsonSerde<>(SourceObject.class);
    Serde<AgrregatedObject> AgrregatedObjectSerde = new JsonSerde<>(AgrregatedObject.class);
    return input -> input.map((key, value) -> new KeyValue<String, SourceObject>(value.uniques(), value))
            .groupByKey(Grouped.with(Serdes.String(), SourceObjectSerde))
            .aggregate(AgrregatedObject::new, (uniques, sourceObject,
                    destinationList) -> new SourceObjectUpdater().apply(sourceObject, destinationList),
                    Materialized.<String, AgrregatedObject>as(Stores.inMemoryKeyValueStore("custome-snapshots")).withKeySerde(Serdes.String()).withValueSerde(AgrregatedObjectSerde))
            .toStream().foreach((foo, bar) -> process);
}

在运行此应用程序时,连同提供给processSourceObject的主题一起,它会自动创建另外两个主题

  1. processSourceObject-applicationId-data-snapshots-changelog
  2. processSourceObject-applicationId-data-snapshots-repartition

由于某些原因,我想使用现有主题而不是使用这两个主题。我在哪里进行更改以提供预定义主题的名称,以供我的应用程序用于更改日志重新分区数据?

标签: spring-bootapache-kafka-streamsconfluent-platformstream-processingevent-stream-processing

解决方案


这取决于您使用的版本。从 Apache Kafka 2.4 开始,Streams API 允许命名所有操作员/处理器,这些名称用于重新分区和更改日志主题。

但是,所有内部主题总是以or为前缀<application.id>-和后缀,因此您只能设置部分主题名称。-repartition-changelog

例如,您可以使用Grouped.as("myName")为重新分区主题设置名称。


推荐阅读