amazon-s3 - Amazon EMR 在为 Apache-Flink 提交作业时遇到 Hadoop 可恢复错误
问题描述
Added Depedency Pom Details :
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop</artifactId>
<version>1.7.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.529</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>1.1.5</version>
<type>pom</type>
</dependency>
</dependencies>
java.lang.UnsupportedOperationException:Hadoop 上的可恢复写入器仅支持 HDFS 和 Hadoop 版本 2.7 或更新版本 org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) at org.apache.flink .runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) at org.apache.flink.streaming.api.functions .sink.filesystem.Buckets.(Buckets.java:112) 在 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) 在 org.apache.flink。 org.apache.flink 上的 streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)。streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) 在 org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) 在 org.apache.flink.streaming.api。 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 中的 operator.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)。 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 中的 initializeState(StreamTask.java:738) 704)在java.lang.Thread.run(Thread.java:748)tryRestoreFunction(StreamingFunctionUtils.java:178) 在 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)tryRestoreFunction(StreamingFunctionUtils.java:178) 在 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)
解决方案
Flink 使用称为ServiceLoader的东西来加载与可插拔文件系统接口所需的组件。如果您想了解 Flink 在代码中的哪个位置,请转到org.apache.flink.core.fs.FileSystem
. 注意initialize
使用RAW_FACTORIES
变量的函数。RAW_FACTORIES
由函数创建loadFileSystems
,您可以看到它利用了 Java 的ServiceLoader
.
在您的应用程序在 Flink 上启动之前,需要设置文件系统组件。这意味着您的 Flink 应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。
EMR 不提供 Flink 需要使用 S3 作为开箱即用的流式文件接收器的 S3 文件系统组件。抛出这个异常不是因为版本不够高,而是因为 Flink 在没有匹配该s3
方案的 FileSystem 的情况下加载了 HadoopFileSystem(请参见此处的代码)。
您可以通过为我的 Flink 应用程序启用 DEBUG 日志记录级别来查看您的文件系统是否正在加载,EMR 允许您在配置中执行此操作:
{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}
相关日志在 YARN 资源管理器中可用,查看单个节点的日志。搜索字符串"Added file system"
应该可以帮助您找到所有成功加载的文件系统。
在这项调查中也很方便的是通过 SSH 连接到主节点并使用flink-scala REPL,在那里我可以看到 FileSystem Flink 决定在给定文件 URI 的情况下加载什么。
/usr/lib/flink/lib/
解决方案是在启动 Flink 应用程序之前将 S3 文件系统实现的 JAR 放入其中。这可以通过抓取flink-s3-fs-hadoop
或flink-s3-fs-presto
(取决于您使用的实现)的引导操作来完成。我的引导操作脚本如下所示:
sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib
sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar
推荐阅读
- apache-spark - 在 pyspark 中读取、转换和输出结果的有效方法
- apache-nifi - Nifi Custom Processor - 未指定传输关系
- debugging - remix ethereum.org 中的调试器将函数返回值显示为“对象”
- bash - 不断变化的变量
- python - 在 k 个子集中拆分列表而不改变顺序
- c# - 如何指定可观察序列之间的依赖关系?
- angular - 如何使用 Angular 将动画设置为 Nebular Stepper,就像 Material 一样
- javascript - 在javascript中单击正文或窗口中的相同元素删除元素
- tensorflow - MLflow 服务于 tensorflow.keras.wrappers.scikit_learn KerasClassifier 模型
- python - 通过 python 请求发出请求时出现错误 400,但在 cURL 中有效