spring-boot - Kafka 流 - 第一个示例 WordCount 不起作用(计数值错误)
问题描述
我正在使用 KafkaStreams 和 Confluent,并且正在尝试开发“字数”流。
基本上,这个想法是从一个主题(wordcount-input)中读取,计算单词的数量,然后将单词的数量写入另一个主题(wordcount-output)。
实现看起来很简单,但是在 Confluent 中查看输出主题时,我没有显示值,而是得到以下输出。
下面,我附上开发的类型学的代码。
StreamsBuilder builder = new StreamsBuilder();
// reading from input topic
KStream<String, String> textLines = builder.stream("wordcount-input", Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, Long> wordsCount = textLines
.mapValues((ValueMapper<? super String, ? extends String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split(" "))).selectKey((ignoredKey, word) -> word)
.groupByKey()
.count()
.toStream();
// writing to output topic
wordsCount.to("wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
作为测试,我正在通过控制台提取信息,并且工作正常,所以失败是在发送输出主题时。
wordsCount.foreach((k,v) -> System.out.println(k + "--" + v));
你知道发生了什么吗?
附上
// configuration properties
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
解决方案
我在您的代码片段中发现您的拓扑没有问题。(查看完整代码会很有用)。问题可能出在仪表板中,您应该通过创建 Kafka 使用者进行检查,使用 StringDeserializer 作为键,使用 LongDeserializer 作为值,并订阅您的输出主题。不要忘记最早使用 AUTO_OFFSET_RESET_CONFIG 来查看您现有的消息。
推荐阅读
- docker - 在 Ubuntu VM 上重新启动后,docker 服务死机
- gnuplot - 图例框尺寸
- javascript - 在 Chrome JavaScript 调试器中单步执行代码会引发不同行的异常?
- html - 用切片拆分二维数组打字稿html
- android - 为什么我的通知没有在我的 BroadcastReceiver 类中创建?
- java - IntelliJ IDEA - CodenameOne - AndroidStudio 安装/JDK 更改后找不到 CEF 路径?
- php - 证书生成器问题[字体加载问题]
- c++ - C++ 在记忆化期间通过引用传递 std::map 并将其保存在堆栈中的最佳选择是什么
- python - 为什么在 Visual Studio 中运行代码时出现异步错误
- node.js - 无法通过在 EC2 实例上运行的 Node.js 应用程序使用 AWS SES 发送电子邮件