java - 尝试运行简单的 Kafka Stream App 时收到异常
问题描述
我一直在尝试使用 Kafka 运行一个简单的 wordcount 应用程序,但是每当我运行它时,都会出现以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/LogContext
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:630)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:610)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:557)
at StreamsApp.main(StreamsApp.java:49)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.LogContext
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
我不知道为什么我不断收到这个错误......下面列出了主要方法的代码。(第 49 行)KafkaStreams 流 = new KafkaStreams(topology, props);
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("inputTopic");
final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
wordCounts
.foreach((w, c) -> System.out.println("word: " + w + " -> " + c));
String outputTopic = "outputTopic";
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Thread.sleep(30000);
streams.close();
}
}
解决方案
您在依赖项org.apache.kafka:kafka-streams
和org.apache.kafka:kafka-clients
. 根据您的例外,您使用的kafka-clients
版本小于 1.0.0,但kafka-streams
版本等于或高于 1.0.0。
确保该kafka-clients
版本至少为 1.0.0(因此您需要升级您的kafka-clients
版本),否则您需要降级kafka-streams
版本。
推荐阅读
- vue.js - 尝试使用返回未定义的连接变量调用环境变量
- reactjs - React-router-dom 与 Material-ui 工作但不安装组件
- javascript - 在 Expo React-Native 中通过 onClick 添加 WebView
- angular - 在 Angular 中,如何创建一个自定义验证器来验证 http 请求?
- android - 非 Google Play 服务设备上的 Firebase 应用分发
- sharepoint - 将使用哪个时区;SharePoint 网站区域设置时区或 Azure Web 应用时区
- java - 使用 thread.sleep() 进行轮询是如何工作的?
- blazor - MatTooltip 不适用于 Blazor 中的 MatAutocompleteList
- maven - 如何将端点 DSL 用于 Apache Camel 中的自定义组件?
- vim - Vimscript - 将函数调用拆分为多行