首页 > 解决方案 > 如何将 KTable 输出发布到特定的 Kafka 主题?

问题描述

我正在尝试使用 Kafka Streams 编写我的第一个练习应用程序来计算主题中的单词数。但是,我认为我指的是旧 API,因为在 lambda 函数的末尾,我想将 KTable 的输出放到一个主题中,但我没有看到任何这样的方法。

我所指的代码使用了一种方法to(),但我认为现在没有这种方法。我看到toStream()但不知道如何使用它将消息发送到特定的输出主题。

有人可以看看,因为这应该是非常基本的。

public static void main(String[] args) {

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-starter-project");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Short().getClass());


        StreamsBuilder builder = new StreamsBuilder();
        //1- Stream from Kafka
        KStream<String, String> wordCountInput = builder.stream("word-count-input");
            //2 - map values to lowercase
            KTable<String,Long> wordCounts = wordCountInput
                    .mapValues(textlines -> textlines.toLowerCase())
                    //or mapValues(String::toLowercase())
                    //3- flatMapValues split by space
                    .flatMapValues(lowerCasedTextLine-> Arrays.asList(lowerCasedTextLine.split(" ")))
                    //4- Select key to apply a key and discard old key
                    .selectKey((ignoredKey,word)-> word)
                    //5 - groupBy key before aggregation
                    .groupByKey()
                    //6- count occurences finally
                    .count();

        **wordCounts.to**

标签: apache-kafka-streams

解决方案


下面的代码非常适合我:

    // 1 stream from kafka
    StreamsBuilder streamBuilder = new StreamsBuilder();
    
    KStream<String, String> wordInputStream = streamBuilder.stream(INPUT_TOPIC);
    
    // 2 Map values to lowercase
    KTable<String, Long> wordCounts = wordInputStream.mapValues(value -> value.toLowerCase())
    // 3 flat map values split by space
    .flatMapValues(value -> Arrays.asList(value.split(SPACE)))
    //4 select key to apply as key
    .selectKey((key, value) -> value)
    //5 group by key before aggregation
    .groupByKey()
    // 6 count
    .count(Named.as("Counts"));
    
    wordCounts.toStream().to(OUTPUT_TOPIC); // here OUTPUT_TOPIC was used from string constant

使用卡夫卡流 3.0.0

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.0.0</version>
</dependency>

推荐阅读