首页 > 解决方案 > Flink on Yarn 意外块数据错误

问题描述

我有一个 Flink 应用程序,它使用来自 Kafka 集群的数据并运行 SQL 数据转换。我在 EMR 上运行此应用程序,当我使用 Java -jar 选项运行时,应用程序按预期运行。

但是,当使用命令 flink run -m yarn-cluster -yn 2 -yjm 1g -ytm 2g JarFileName arg1 arg2 运行纱线时

应用程序因堆栈跟踪而失败。

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
    ... 36 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: unexpected block data
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)

更新:

此问题已解决。根本原因是 Flink 的类加载器。我们最初使用的是 Springboot maven 插件。我们将其更改为 maven shade 插件并解决了该问题。

参考: 集成——Apache Flink + Spring Boot

标签: apache-flinkhadoop-yarnamazon-emr

解决方案


您使用的是哪个 Flink 版本?您确定在集群上使用相同的版本吗?

这种(反)序列化问题通常与用于序列化和反序列化的不同版本相关联。


推荐阅读