首页 > 解决方案 > 未找到 Flink-Kubernetes Azure Blob 存储和 S3 文件系统实现

问题描述

我在 Azure 上托管了 Kubernetes,我正在尝试将 Flink 流数据存储到 AWS S3 和 Azure Blob-Storage。

我正在为这个任务使用如下创建的 Flink Session 集群。

./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<cluster_name>

为了支持"wasbs://" (zure Blob-store FS URI) and "s3://" (AWS S3 FS URI),我按照这里提到的步骤进行了操作。


步骤 1:登录 Job Manager 并在“ plugins ”目录下,创建插件目录:

对于 S3

mkdir s3-fs-hadoop
mkdir s3-fs-presto

第 2 步: 复制文件夹下的相应 .jar 文件:

s3-fs-presto/flink-s3-fs-presto-1.13.0.jar
s3-fs-hadoop/flink-s3-fs-hadoop-1.13.0.jar

第 3 步: 运行应用程序。


问题:

即使在完成上述步骤之后,我每次都会遇到以下异常:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported byFlink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory.See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:513)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407)
        at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

试过:

我什至在我的作业管理器 YAML 中启用了插件:

...
env:
            - name: _POD_IP_ADDRESS
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.podIP
            - name: ENABLE_BUILT_IN_PLUGINS
              value: flink-s3-fs-presto-1.13.0.jar;flink-s3-fs-hadoop-1.13.0.jar

在这里,我只为 S3 添加了 jar,但放置flink-azure-fs-hadoop-1.13.0.jar在 Blob 存储中并收到了相同的异常,只是它显示了"wasbs://"FileSystem 的问题。

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'wasb'/'wasbs'

我什至在 YAML 中添加了 Azure 存储帐户密钥以供访问。

S3 的 YAML:

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "1"
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"apps/v1","kind":"Deployment","metadata":{"annotations":{},"name":"flink-jobmanager","namespace":"default"},"spec":{"replicas":1,"selector":{"matchLabels":{"app":"flink","component":"jobmanager"}},"template":{"metadata":{"labels":{"app":"flink","component":"jobmanager"}},"spec":{"containers":[{"args":["jobmanager","$(POD_IP)"],"env":[{"name":"POD_IP","valueFrom":{"fieldRef":{"apiVersion":"v1","fieldPath":"status.podIP"}}}],"image":"apache/flink:1.13.0-scala_2.12","livenessProbe":{"initialDelaySeconds":30,"periodSeconds":60,"tcpSocket":{"port":6123}},"name":"jobmanager","ports":[{"containerPort":6123,"name":"rpc"},{"containerPort":6124,"name":"blob-server"},{"containerPort":8081,"name":"webui"}],"securityContext":{"runAsUser":9999},"volumeMounts":[{"mountPath":"/opt/flink/conf","name":"flink-config-volume"}]}],"serviceAccountName":"default","volumes":[{"configMap":{"items":[{"key":"flink-conf.yaml","path":"flink-conf.yaml"},{"key":"log4j-console.properties","path":"log4j-console.properties"}],"name":"flink-config"},"name":"flink-config-volume"}]}}}}
  creationTimestamp: "2021-07-22T09:34:02Z"
  generation: 1
  name: flink-jobmanager
  namespace: default
  resourceVersion: "4634197"
  uid: bc8d7b32-6384-4ce2-ac87-e9f752d1fc17
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - args:
        - jobmanager
        - $(POD_IP)
        env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        image: apache/flink:1.13.0-scala_2.12
        imagePullPolicy: IfNotPresent
        livenessProbe:
          failureThreshold: 3
          initialDelaySeconds: 30
          periodSeconds: 60
          successThreshold: 1
          tcpSocket:
            port: 6123
          timeoutSeconds: 1
        name: jobmanager
        ports:
        - containerPort: 6123
          name: rpc
          protocol: TCP
        - containerPort: 6124
          name: blob-server
          protocol: TCP
        - containerPort: 8081
          name: webui
          protocol: TCP
        resources: {}
        securityContext:
          runAsUser: 9999
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /opt/flink/conf
          name: flink-config-volume
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: default
      serviceAccountName: default
      terminationGracePeriodSeconds: 30
      volumes:
      - configMap:
          defaultMode: 420
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
          name: flink-config
        name: flink-config-volume
status:
  availableReplicas: 1
  conditions:
  - lastTransitionTime: "2021-07-22T09:34:38Z"
    lastUpdateTime: "2021-07-22T09:34:38Z"
    message: Deployment has minimum availability.
    reason: MinimumReplicasAvailable
    status: "True"
    type: Available
  - lastTransitionTime: "2021-07-22T09:34:02Z"
    lastUpdateTime: "2021-07-22T09:34:38Z"
    message: ReplicaSet "flink-jobmanager-678bdbd99c" has successfully progressed.
    reason: NewReplicaSetAvailable
    status: "True"
    type: Progressing
  observedGeneration: 1
  readyReplicas: 1
  replicas: 1
  updatedReplicas: 1

flink 代码的 pom.xml:

project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>FlinkScala</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>Flink Quickstart Job</name>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.13.0</flink.version>
    <target.java.version>1.8</target.java.version>
    <scala.binary.version>2.12</scala.binary.version>
    <scala.version>2.12.11</scala.version>
    <log4j.version>2.12.1</log4j.version>
    <mongodb.hadoop.version>1.3.0</mongodb.hadoop.version>
<!--    <hadoop.version>2.4.0</hadoop.version>-->
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-rabbitmq_2.11</artifactId>
      <version>1.13.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver -->
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>mongo-java-driver</artifactId>
      <version>2.13.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
<!--      <scope>compile</scope>-->
<!--      <exclusions>-->
<!--        <exclusion>-->
<!--          <groupId>org.apache.flink</groupId>-->
<!--          <artifactId>flink-core</artifactId>-->
<!--        </exclusion>-->
<!--      </exclusions>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
<!--      <scope>compile</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
<!--      <scope>compile</scope>-->
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.1.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.13.0</version>
<!--      <scope>test</scope>-->
    </dependency>
    <!-- Scala Library, provided by Flink as well. -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>${log4j.version}</version>
      <scope>runtime</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>${log4j.version}</version>
      <scope>runtime</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>${log4j.version}</version>
      <scope>runtime</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <!-- Run shade goal on package phase -->
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>org.apache.flink:force-shading</exclude>
                  <exclude>com.google.code.findbugs:jsr305</exclude>
                  <exclude>org.slf4j:*</exclude>
                  <exclude>org.apache.logging.log4j:*</exclude>
                </excludes>
              </artifactSet>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>org.example.FlinkS3</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>

      <!-- Java Compiler -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>${target.java.version}</source>
          <target>${target.java.version}</target>
        </configuration>
      </plugin>

      <!-- Scala Compiler -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <args>
            <arg>-nobootcp</arg>
            <arg>-target:jvm-${target.java.version}</arg>
          </args>
        </configuration>
      </plugin>

      <!-- Eclipse Scala Integration -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <version>2.8</version>
        <configuration>
          <downloadSources>true</downloadSources>
          <projectnatures>
            <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
            <projectnature>org.eclipse.jdt.core.javanature</projectnature>
          </projectnatures>
          <buildcommands>
            <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <classpathContainers>
            <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
          </classpathContainers>
          <excludes>
            <exclude>org.scala-lang:scala-library</exclude>
            <exclude>org.scala-lang:scala-compiler</exclude>
          </excludes>
          <sourceIncludes>
            <sourceInclude>**/*.scala</sourceInclude>
            <sourceInclude>**/*.java</sourceInclude>
          </sourceIncludes>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>build-helper-maven-plugin</artifactId>
        <version>1.7</version>
        <executions>
          <!-- Add src/main/scala to eclipse build path -->
          <execution>
            <id>add-source</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>add-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>src/main/scala</source>
              </sources>
            </configuration>
          </execution>
          <!-- Add src/test/scala to eclipse build path -->
          <execution>
            <id>add-test-source</id>
            <phase>generate-test-sources</phase>
            <goals>
              <goal>add-test-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>src/test/scala</source>
              </sources>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

这是 Flink API 的问题还是我在这里遗漏了什么?

标签: amazon-s3yamlapache-flinkflink-streamingazure-aks

解决方案


推荐阅读