apache-kafka - Flink - kafka-producer-network-thread 中未捕获的异常
问题描述
我正在创建一个读取两个 kafka 主题并写入一个 kafka 主题的流式作业。我正在使用这些版本flink 1.4.1
:kafka_2.11-1.0.0
和flink-connector-kafka-0.11_2.11
.
有时(这不是系统的),我有这个日志:
KafkaThread.6648.1 - - |43| Uncaught exception in kafka-producer-network-thread | agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943:
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
at org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:583) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
at org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:705) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:443) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.NetworkClient$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_161]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_161]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) ~[flink-dist_2.11-1.4.1.jar:1.4.1]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_161]
... 6 common frames omitted
这agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943
是我的制片人的 group.id。
我有几份同时工作的工作。此日志可以针对一项作业而不是其他作业显示。我对消费者没有问题。感觉这个日志即使job没有写在Kafka topic里也能出现。
我的pom.xml
样子是这样的:
<properties>
<flink.version>1.4.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
<excludeTransitive>true</excludeTransitive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise,
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>logback.xml</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.mypackage.MyClass</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
我想了解此日志出现的原因、严重性以及如何修复它...谢谢。
解决方案
这个错误很可能是因为无法连接到 Kafka 代理而发生的,原因可能是:
您的 Kafka 引导服务器不工作。
您的 Kafka 引导服务器位于另一台机器上,并且您的机器 IP 地址未列入 Kafka 服务器的白名单。
尝试运行本地 Kafka 并连接到它以查看错误是否仍然存在。尝试在所有有 Flink 的节点上运行 zookeeper。
推荐阅读
- css - 多个粘性元素在 Firefox 中消失
- java - 如何使用 ROLES 正确保护端点?
- python - np.where 有两个条件
- java - 在 Android 垃圾邮件上旋转屏幕保存的实例状态和 ActivityResultRegistry
- java - 401 on request where `permitAll()` 指定
- sql - 在 powershell 和 SQL 中转义双引号
- r - 在加载许多包时确定哪个版本的函数处于活动状态
- excel - 如果值在工作表 A 的列中但不存在于工作表 B 的列中,则将该值添加到工作表 B
- python - 是否可以将 .py 转换为包含所有图像和图标文件的单个 .exe 文件
- rdkit - 为什么 H 会因为 H 不存在的结构而出现微笑