apache-flink - Flink on Yarn 意外块数据错误
问题描述
我有一个 Flink 应用程序,它使用来自 Kafka 集群的数据并运行 SQL 数据转换。我在 EMR 上运行此应用程序,当我使用 Java -jar 选项运行时,应用程序按预期运行。
但是,当使用命令 flink run -m yarn-cluster -yn 2 -yjm 1g -ytm 2g JarFileName arg1 arg2 运行纱线时
应用程序因堆栈跟踪而失败。
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 36 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
更新:
此问题已解决。根本原因是 Flink 的类加载器。我们最初使用的是 Springboot maven 插件。我们将其更改为 maven shade 插件并解决了该问题。
解决方案
您使用的是哪个 Flink 版本?您确定在集群上使用相同的版本吗?
这种(反)序列化问题通常与用于序列化和反序列化的不同版本相关联。
推荐阅读
- javascript - 如何在输入复选框中检索值(使用 Reactjs)
- angular - 自定义验证 Angular 6 - 至少填充一个文本区域
- python - Django 禁用设置文件中的级联删除
- ionic-framework - 如何在 ionic 3 中创建可滚动的 ion-tab
- ios - 如何获取 iOS 版本?
- php - php函数中的“新”均值
- angular - 如何在 Angular 6 中使用外部 js 文件到 angular.json
- java - 将 byte[] 转换为自定义 java 对象
- python - 在 MacOS 上安装 rpy2
- python - 在 pytorch 中执行批处理矩阵 - 多个权重矩阵乘法