apache-flink - Flink 1.10 on yarn 消耗 kafka 0.9 失败
问题描述
我想在 flink on yarn 模式下提交作业,只需消耗记录并打印。代码在我的本地计算机上运行良好,但提交到 yarn 时失败。我试图更改依赖关系或将 jar 放入 lib 目录,但它没有t 工作。对此有任何想法吗?非常感谢。
代码和环境如下。
flink 1.10,卡夫卡 0.9。
代码:
FlinkKafkaConsumer09<String> kafkaSource = new FlinkKafkaConsumer09(
"test",
new SimpleStringSchema(),
properties);
kafkaSource.setStartFromLatest();
DataStream<String> stream = env.addSource(kafkaSource);
stream.print().setParallelism(1);
pom的一部分
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
<scope>compile</scope>
</dependency>
错误信息
2020-05-14 20:22:22,925 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
Custom Source -> Sink: Print to Std. Out (1/1) (dd3a239f2854c735c23ab307ff277e62) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Lorg/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge09;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/api/common/io/ratelimiting/FlinkConnectorRateLimiter;)V
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:270)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
解决方案
推荐阅读
- asynchronous - .Net Core 中的异步操作如何变得简单
- json - jq 如果值匹配则更新结束
- android - Anchoring a view to appear at position that was clicked?
- php - 如何获取 Laravel 中每本书访问的最新章节 URL?
- python - 无法从每个表 tr 中刮取复选框值
- java - Does single static method can cause the test failure in parallel execution - selenium
- android - 如何使 Gridlayout 可滚动
- excel - DAX - Calculation based on several related tables' filtering result
- python - 如何使用 python 脚本从数据库中检索具有列名的值?
- python - 代码转换 Python 中的返回函数问题