首页 > 解决方案 > Flink 1.10 on yarn 消耗 kafka 0.9 失败

问题描述

我想在 flink on yarn 模式下提交作业,只需消耗记录并打印。代码在我的本地计算机上运行良好,但提交到 yarn 时失败。我试图更改依赖关系或将 jar 放入 lib 目录,但它没有t 工作。对此有任何想法吗?非常感谢。

代码和环境如下。

flink 1.10,卡夫卡 0.9。

代码:

        FlinkKafkaConsumer09<String> kafkaSource = new FlinkKafkaConsumer09(
                "test",
                new SimpleStringSchema(),
                properties);

        kafkaSource.setStartFromLatest();
        DataStream<String> stream = env.addSource(kafkaSource);
        stream.print().setParallelism(1);

pom的一部分

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
        <scope>compile</scope>
    </dependency>

错误信息

2020-05-14 20:22:22,925 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 

Custom Source -> Sink: Print to Std. Out (1/1) (dd3a239f2854c735c23ab307ff277e62) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Lorg/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge09;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/api/common/io/ratelimiting/FlinkConnectorRateLimiter;)V
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:270)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

标签: apache-flinkkafka-consumer-api

解决方案


推荐阅读