java - Apache Kafka (KStreams):如何订阅多个主题?
问题描述
我有以下代码
//Kafka Config setup
Properties props = ...; //setup
List<String> topicList = Arrays.asList({"A", "B", "C"});
StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);
source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});
new KafkaStream(builder.build(), properties).start();
我的问题:当我添加多个订阅主题(即上面的 A、B、C)时,Kstream 代码停止接收记录。
参考资料: https ://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html
相关文件
public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)
"If multiple topics are specified there is no ordering guarantee for records from different topics."
我想要实现的目标:让一个 Kstream(即上面的“源”)从多个主题中消费/处理。
解决方案
主题是否共享相同的键?
请注意,指定的输入主题必须按 key 进行分区。如果不是这种情况,则用户有责任在将任何基于键的操作(如聚合或连接)应用于返回的 KStream 之前重新分区数据。
这可能是你的阻碍。
另一个可能的问题可能是使用的消费者组。
推荐阅读
- google-cloud-dataflow - 使用 Cloud Composer 启动 CloudDataFlow Java 应用程序时出错
- javascript - draw() 完成回调
- qlikview - 无法使用 PptxGenJs 库设置 Html 表的位置
- jquery - 将 optgroup 中的动态选项添加到 select2
- visual-studio-code - Vi 'z' 命令是否有等效的 VSCODE?
- android - 保护 Firebase 实时数据库,授予对 Android 穿戴设备的访问权限
- scala - Scala - 如何使用模式匹配返回 Option[String] 而不是 Any?
- c# - 临时值类型如何装箱
- linux - 如何在 LXSession (LXDE, Lubuntu) 中使用很棒的 WM
- python - Python 3.6 仍然是使用 homebrew 安装并升级到 3.7 后系统报告的 python 版本