首页 > 解决方案 > Kafka Stream 自动读取新主题?

问题描述

有什么方法可以让我的 Kafka Stream 应用程序自动从新创建的主题中读取?

即使在流应用程序已经运行时创建了主题

就像在主题名称中使用通配符一样:

KStream<String, String> rawText = builder.stream("topic-input-*");

为什么我需要这个?

现在,我有多个客户端将数据(都具有相同的模式)发送到他们自己的主题,而我的流应用程序从这些主题中读取数据。然后我的应用程序进行一些转换并将结果写入单个主题

尽管所有的客户都可以写同一个主题,但一个不守规矩的客户也可以代表其他人写作。所以我为每个客户创建了单独的主题。问题是,每当有新客户出现时,我都会创建新主题并使用脚本为他们设置 ACL,但这还不够。我还必须停止我的流应用程序,编辑代码,添加新主题,编译,打包,放到服务器上,然后再次运行!

标签: apache-kafkaapache-kafka-streams

解决方案


Kafka Streams 支持模式订阅:

builder.stream(Pattern.compile("topic-input-*"));

(我希望语法是正确的;从我的脑海中不确定......但重点是,String您可以使用stream()采用模式的方法的重载,而不是传入 a。)


推荐阅读