首页 > 解决方案 > Kafka 流 - 第一个示例 WordCount 不起作用(计数值错误)

问题描述

我正在使用 KafkaStreams 和 Confluent,并且正在尝试开发“字数”流。

基本上,这个想法是从一个主题(wordcount-input)中读取,计算单词的数量,然后将单词的数量写入另一个主题(wordcount-output)。

实现看起来很简单,但是在 Confluent 中查看输出主题时,我没有显示值,而是得到以下输出。

输入主题

输出主题

下面,我附上开发的类型学的代码。

StreamsBuilder builder = new StreamsBuilder();

// reading from input topic
KStream<String, String> textLines = builder.stream("wordcount-input", Consumed.with(Serdes.String(), Serdes.String()));

KStream<String, Long> wordsCount = textLines
   .mapValues((ValueMapper<? super String, ? extends String>) String::toLowerCase)
   .flatMapValues(value -> Arrays.asList(value.split(" "))).selectKey((ignoredKey, word) -> word)
   .groupByKey()
   .count()
   .toStream();
  
// writing to output topic
wordsCount.to("wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

作为测试,我正在通过控制台提取信息,并且工作正常,所以失败是在发送输出主题时。

wordsCount.foreach((k,v) -> System.out.println(k + "--" + v));

你知道发生了什么吗?

附上

// configuration properties
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

标签: spring-bootapache-kafkaapache-kafka-streams

解决方案


我在您的代码片段中发现您的拓扑没有问题。(查看完整代码会很有用)。问题可能出在仪表板中,您应该通过创建 Kafka 使用者进行检查,使用 StringDeserializer 作为键,使用 LongDeserializer 作为值,并订阅您的输出主题。不要忘记最早使用 AUTO_OFFSET_RESET_CONFIG 来查看您现有的消息。


推荐阅读