首页 > 解决方案 > Flink - kafka-producer-network-thread 中未捕获的异常

问题描述

我正在创建一个读取两个 kafka 主题并写入一个 kafka 主题的流式作业。我正在使用这些版本flink 1.4.1:kafka_2.11-1.0.0flink-connector-kafka-0.11_2.11.

有时(这不是系统的),我有这个日志:

KafkaThread.6648.1 - - |43| Uncaught exception in kafka-producer-network-thread | agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943: 
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1 
        at org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:583) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:705) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:443) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161] 
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.NetworkClient$1 
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_161] 
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_161] 
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) ~[flink-dist_2.11-1.4.1.jar:1.4.1]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_161] 
        ... 6 common frames omitted

agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943是我的制片人的 group.id。

我有几份同时工作的工作。此日志可以针对一项作业而不是其他作业显示。我对消费者没有问题。感觉这个日志即使job没有写在Kafka topic里也能出现。

我的pom.xml样子是这样的:

<properties>
    <flink.version>1.4.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <phase>install</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>target/lib</outputDirectory>
                        <overWriteReleases>false</overWriteReleases>
                        <overWriteSnapshots>true</overWriteSnapshots>
                        <excludeTransitive>true</excludeTransitive>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>${maven.shade.version}</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder. Otherwise, 
                                        this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                    <exclude>logback.xml</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mypackage.MyClass</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

我想了解此日志出现的原因、严重性以及如何修复它...谢谢。

标签: apache-kafkaapache-flinkkafka-producer-api

解决方案


这个错误很可能是因为无法连接到 Kafka 代理而发生的,原因可能是:

  • 您的 Kafka 引导服务器不工作。

  • 您的 Kafka 引导服务器位于另一台机器上,并且您的机器 IP 地址未列入 Kafka 服务器的白名单。

尝试运行本地 Kafka 并连接到它以查看错误是否仍然存在。尝试在所有有 Flink 的节点上运行 zookeeper。


推荐阅读