apache-kafka - 使用 Kafka Streams DSL 多次将同一主题用作源
问题描述
使用 Kafka Streams DSL 时,有没有办法使用相同的主题作为两个不同处理例程的源?
StreamsBuilder streamsBuilder = new StreamsBuilder();
// use the topic as a stream
streamsBuilder.stream("topic")...
// use the same topic as a source for KTable
streamsBuilder.table("topic")...
return streamsBuilder.build();
上面的幼稚实现TopologyException
在运行时抛出:无效拓扑:主题主题已被另一个来源注册。如果我们深入研究底层处理器 API,这是完全有效的。使用它是唯一的出路吗?
更新: 到目前为止我发现的最接近的替代方案:
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<Object, Object> stream = streamsBuilder.stream("topic");
// use the topic as a stream
stream...
// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...
return streamsBuilder.build();
解决方案
阅读与流和表格相同的主题在语义上是有问题的恕我直言。Streams 对不可变的事实进行建模,而用于读入 KTable 模型的更改日志主题会更新。
如果你想在多个流中使用一个主题,你可以KStream
多次重用同一个对象(它在语义上就像一个广播):
KStream stream = ...
stream.filter();
stream.map();
另请比较:https ://issues.apache.org/jira/browse/KAFKA-6687 (有计划取消此限制。我怀疑,我们将允许同时使用一个主题KStream
-KTable
比较我的评论从上面)。
推荐阅读
- python - Conda:如果无法访问 conda 通道,如何忽略?忽略 UnavailableInvalidChannel?
- python - 使用 Kivy+Buildozer 构建 Android 应用
- tensorflow2.0 - 如何循环 tf.strings?
- javascript - 如果我添加 PropTypes,我需要检查 typeof 吗?
- node.js - 在 react 中使用自制的 npm 包。编译失败
- php - 如何使用 PHP、docker-compose 将用户上传的文件保存到安装的卷?
- three.js - morphTargets 不仅缩小了相关顶点,还缩小了整个网格
- cookies - 查找导致 Chrome 的 SameSite 警告的 cookie
- javascript - 在特定范围(最大范围)内旋转老虎机?
- mysql - 在一个 SQL 查询中合并两个表,并使用表名作为列名