首页 > 解决方案 > 使用 Kafka Streams DSL 多次将同一主题用作源

问题描述

使用 Kafka Streams DSL 时,有没有办法使用相同的主题作为两个不同处理例程的源?

StreamsBuilder streamsBuilder = new StreamsBuilder();

// use the topic as a stream
streamsBuilder.stream("topic")...

// use the same topic as a source for KTable
streamsBuilder.table("topic")...

return streamsBuilder.build();

上面的幼稚实现TopologyException在运行时抛出:无效拓扑:主题主题已被另一个来源注册。如果我们深入研究底层处理器 API,这是完全有效的。使用它是唯一的出路吗?

更新: 到目前为止我发现的最接近的替代方案:

StreamsBuilder streamsBuilder = new StreamsBuilder();

final KStream<Object, Object> stream = streamsBuilder.stream("topic");

// use the topic as a stream
stream...

// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...

return streamsBuilder.build();

标签: apache-kafkaapache-kafka-streams

解决方案


阅读与流和表格相同的主题在语义上是有问题的恕我直言。Streams 对不可变的事实进行建模,而用于读入 KTable 模型的更改日志主题会更新。

如果你想在多个流中使用一个主题,你可以KStream多次重用同一个对象(它在语义上就像一个广播):

KStream stream = ...
stream.filter();
stream.map();

另请比较:https ://issues.apache.org/jira/browse/KAFKA-6687 (有计划取消此限制。我怀疑,我们将允许同时使用一个主题KStream-KTable比较我的评论从上面)。


推荐阅读