首页 > 解决方案 > Amazon EMR 在为 Apache-Flink 提交作业时遇到 Hadoop 可恢复错误

问题描述

Added Depedency Pom Details :

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop</artifactId>
            <version>1.7.1</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.11.529</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-connectors</artifactId>
            <version>1.1.5</version>
            <type>pom</type>
        </dependency>
    </dependencies>

java.lang.UnsupportedOperationException:Hadoop 上的可恢复写入器仅支持 HDFS 和 Hadoop 版本 2.7 或更新版本 org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) at org.apache.flink .runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) at org.apache.flink.streaming.api.functions .sink.filesystem.Buckets.(Buckets.java:112) 在 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) 在 org.apache.flink。 org.apache.flink 上的 streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)。streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) 在 org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) 在 org.apache.flink.streaming.api。 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 中的 operator.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)。 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 中的 initializeState(StreamTask.java:738) 704)在java.lang.Thread.run(Thread.java:748)tryRestoreFunction(StreamingFunctionUtils.java:178) 在 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)tryRestoreFunction(StreamingFunctionUtils.java:178) 在 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)

标签: amazon-s3apache-flinkhadoop2

解决方案


Flink 使用称为ServiceLoader的东西来加载与可插拔文件系统接口所需的组件。如果您想了解 Flink 在代码中的哪个位置,请转到org.apache.flink.core.fs.FileSystem. 注意initialize使用RAW_FACTORIES变量的函数。RAW_FACTORIES由函数创建loadFileSystems,您可以看到它利用了 Java 的ServiceLoader.

在您的应用程序在 Flink 上启动之前,需要设置文件系统组件。这意味着您的 Flink 应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。

EMR 不提供 Flink 需要使用 S3 作为开箱即用的流式文件接收器的 S3 文件系统组件。抛出这个异常不是因为版本不够高,而是因为 Flink 在没有匹配该s3方案的 FileSystem 的情况下加载了 HadoopFileSystem(请参见此处的代码)。

您可以通过为我的 Flink 应用程序启用 DEBUG 日志记录级别来查看您的文件系统是否正在加载,EMR 允许您在配置中执行此操作:

{
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.rootLogger": "DEBUG,file"
    }
  },{
    "Classification": "flink-log4j-yarn-session",
    "Properties": {
      "log4j.rootLogger": "DEBUG,stdout"
    }
  }

相关日志在 YARN 资源管理器中可用,查看单个节点的日志。搜索字符串"Added file system"应该可以帮助您找到所有成功加载的文件系统。

在这项调查中也很方便的是通过 SSH 连接到主节点并使用flink-scala REPL,在那里我可以看到 FileSystem Flink 决定在给定文件 URI 的情况下加载什么。

/usr/lib/flink/lib/解决方案是在启动 Flink 应用程序之前将 S3 文件系统实现的 JAR 放入其中。这可以通过抓取flink-s3-fs-hadoopflink-s3-fs-presto(取决于您使用的实现)的引导操作来完成。我的引导操作脚本如下所示:

sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib

sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar

推荐阅读