amazon-s3 - 未找到 Flink-Kubernetes Azure Blob 存储和 S3 文件系统实现
问题描述
我在 Azure 上托管了 Kubernetes,我正在尝试将 Flink 流数据存储到 AWS S3 和 Azure Blob-Storage。
我正在为这个任务使用如下创建的 Flink Session 集群。
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<cluster_name>
为了支持"wasbs://" (zure Blob-store FS URI) and "s3://" (AWS S3 FS URI)
,我按照这里提到的步骤进行了操作。
步骤 1:登录 Job Manager 并在“ plugins ”目录下,创建插件目录:
对于 S3
mkdir s3-fs-hadoop
mkdir s3-fs-presto
第 2 步: 复制文件夹下的相应 .jar 文件:
s3-fs-presto/flink-s3-fs-presto-1.13.0.jar
s3-fs-hadoop/flink-s3-fs-hadoop-1.13.0.jar
第 3 步: 运行应用程序。
问题:
即使在完成上述步骤之后,我每次都会遇到以下异常:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported byFlink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory.See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:513)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
试过:
我什至在我的作业管理器 YAML 中启用了插件:
...
env:
- name: _POD_IP_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-presto-1.13.0.jar;flink-s3-fs-hadoop-1.13.0.jar
在这里,我只为 S3 添加了 jar,但放置flink-azure-fs-hadoop-1.13.0.jar
在 Blob 存储中并收到了相同的异常,只是它显示了"wasbs://"
FileSystem 的问题。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'wasb'/'wasbs'
我什至在 YAML 中添加了 Azure 存储帐户密钥以供访问。
S3 的 YAML:
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
deployment.kubernetes.io/revision: "1"
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"annotations":{},"name":"flink-jobmanager","namespace":"default"},"spec":{"replicas":1,"selector":{"matchLabels":{"app":"flink","component":"jobmanager"}},"template":{"metadata":{"labels":{"app":"flink","component":"jobmanager"}},"spec":{"containers":[{"args":["jobmanager","$(POD_IP)"],"env":[{"name":"POD_IP","valueFrom":{"fieldRef":{"apiVersion":"v1","fieldPath":"status.podIP"}}}],"image":"apache/flink:1.13.0-scala_2.12","livenessProbe":{"initialDelaySeconds":30,"periodSeconds":60,"tcpSocket":{"port":6123}},"name":"jobmanager","ports":[{"containerPort":6123,"name":"rpc"},{"containerPort":6124,"name":"blob-server"},{"containerPort":8081,"name":"webui"}],"securityContext":{"runAsUser":9999},"volumeMounts":[{"mountPath":"/opt/flink/conf","name":"flink-config-volume"}]}],"serviceAccountName":"default","volumes":[{"configMap":{"items":[{"key":"flink-conf.yaml","path":"flink-conf.yaml"},{"key":"log4j-console.properties","path":"log4j-console.properties"}],"name":"flink-config"},"name":"flink-config-volume"}]}}}}
creationTimestamp: "2021-07-22T09:34:02Z"
generation: 1
name: flink-jobmanager
namespace: default
resourceVersion: "4634197"
uid: bc8d7b32-6384-4ce2-ac87-e9f752d1fc17
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
app: flink
component: jobmanager
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
app: flink
component: jobmanager
spec:
containers:
- args:
- jobmanager
- $(POD_IP)
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
image: apache/flink:1.13.0-scala_2.12
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 3
initialDelaySeconds: 30
periodSeconds: 60
successThreshold: 1
tcpSocket:
port: 6123
timeoutSeconds: 1
name: jobmanager
ports:
- containerPort: 6123
name: rpc
protocol: TCP
- containerPort: 6124
name: blob-server
protocol: TCP
- containerPort: 8081
name: webui
protocol: TCP
resources: {}
securityContext:
runAsUser: 9999
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /opt/flink/conf
name: flink-config-volume
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: default
serviceAccountName: default
terminationGracePeriodSeconds: 30
volumes:
- configMap:
defaultMode: 420
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
name: flink-config
name: flink-config-volume
status:
availableReplicas: 1
conditions:
- lastTransitionTime: "2021-07-22T09:34:38Z"
lastUpdateTime: "2021-07-22T09:34:38Z"
message: Deployment has minimum availability.
reason: MinimumReplicasAvailable
status: "True"
type: Available
- lastTransitionTime: "2021-07-22T09:34:02Z"
lastUpdateTime: "2021-07-22T09:34:38Z"
message: ReplicaSet "flink-jobmanager-678bdbd99c" has successfully progressed.
reason: NewReplicaSetAvailable
status: "True"
type: Progressing
observedGeneration: 1
readyReplicas: 1
replicas: 1
updatedReplicas: 1
flink 代码的 pom.xml:
project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FlinkScala</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<log4j.version>2.12.1</log4j.version>
<mongodb.hadoop.version>1.3.0</mongodb.hadoop.version>
<!-- <hadoop.version>2.4.0</hadoop.version>-->
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>compile</scope>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-core</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>compile</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>compile</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.0</version>
<!-- <scope>test</scope>-->
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.FlinkS3</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-nobootcp</arg>
<arg>-target:jvm-${target.java.version}</arg>
</args>
</configuration>
</plugin>
<!-- Eclipse Scala Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
这是 Flink API 的问题还是我在这里遗漏了什么?
解决方案
推荐阅读
- javascript - 在 D3 中使用三元
- c# - Convert.ToBoolean(reader["Name"]) 和 (bool) (reader["Name"]) 之间的区别?
- python - 返回带有目标网址的http响应重定向,python
- javascript - 如何使用数组定义状态以及如何使用 setState() 方法
- c# - 什么应该`ReadAsAsync
` 和 `ReadAsStringAsync` 是用来做什么的? - macos - zsh:权限被拒绝:gam
- c# - 机器人如何在 Slack 中启动线程
- javascript - 将量角器步骤定义的语法从 Promise 更改为异步语法
- javascript - IE11 复制/粘贴无法从剪贴板 Javascript 中检索本地路径
- php - Laravel 5.6 - 在没有中间件的情况下在控制器外部获取经过身份验证的用户