apache-kafka-streams - 如何将 KTable 输出发布到特定的 Kafka 主题?
问题描述
我正在尝试使用 Kafka Streams 编写我的第一个练习应用程序来计算主题中的单词数。但是,我认为我指的是旧 API,因为在 lambda 函数的末尾,我想将 KTable 的输出放到一个主题中,但我没有看到任何这样的方法。
我所指的代码使用了一种方法to()
,但我认为现在没有这种方法。我看到toStream()
但不知道如何使用它将消息发送到特定的输出主题。
有人可以看看,因为这应该是非常基本的。
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-starter-project");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Short().getClass());
StreamsBuilder builder = new StreamsBuilder();
//1- Stream from Kafka
KStream<String, String> wordCountInput = builder.stream("word-count-input");
//2 - map values to lowercase
KTable<String,Long> wordCounts = wordCountInput
.mapValues(textlines -> textlines.toLowerCase())
//or mapValues(String::toLowercase())
//3- flatMapValues split by space
.flatMapValues(lowerCasedTextLine-> Arrays.asList(lowerCasedTextLine.split(" ")))
//4- Select key to apply a key and discard old key
.selectKey((ignoredKey,word)-> word)
//5 - groupBy key before aggregation
.groupByKey()
//6- count occurences finally
.count();
**wordCounts.to**
解决方案
下面的代码非常适合我:
// 1 stream from kafka
StreamsBuilder streamBuilder = new StreamsBuilder();
KStream<String, String> wordInputStream = streamBuilder.stream(INPUT_TOPIC);
// 2 Map values to lowercase
KTable<String, Long> wordCounts = wordInputStream.mapValues(value -> value.toLowerCase())
// 3 flat map values split by space
.flatMapValues(value -> Arrays.asList(value.split(SPACE)))
//4 select key to apply as key
.selectKey((key, value) -> value)
//5 group by key before aggregation
.groupByKey()
// 6 count
.count(Named.as("Counts"));
wordCounts.toStream().to(OUTPUT_TOPIC); // here OUTPUT_TOPIC was used from string constant
使用卡夫卡流 3.0.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.0.0</version>
</dependency>
推荐阅读
- javascript - NestJS 根据浏览器的语言传递静态文件
- angular - 表格列的多个值上的角度过滤器管道
- python - 我已经尝试安装 tensorflow 两天了......它似乎安装但我无法运行它
- linux - 无与伦比的'。设置别名时出错
- angular - 如何对 ngfor 中的按钮进行单元测试
- python - Python 3 相当于 Bash 的“读取 -rs -t1 -n1”命令来轮询/等待用户的字符?
- r - 使用 lapply 在嵌套的多变量列表上应用函数
- python - 使用 cgi 字段存储发布到数据库时无法获取属性
- sql-server - 有没有办法检查 NOEXEC 状态?
- reactjs - ReactJS 中的 SignalR - 在不同组件中使用相同的 hubConnection