首页 > 解决方案 > 如何在 Flink 中为 Google Cloud Storage 创建 RecoverableWriter

问题描述

我想使用谷歌云存储从我的流媒体作业中写入(接收)元素。DataStreamStreamingFileSink

为此,我使用Hadoop 的Google Cloud Storage 连接器作为 的实现org.apache.hadoop.fs.FileSystem并用作包装 Flink 的hadoop FileSystem类的实现。HadoopFileSystemorg.apache.flink.core.fs.FileSystem

我在我的 gradle 文件中包含了以下依赖项:

现在,根据我对源[1] [2] [3]FileSystemFactory的理解,Flink在运行时(通过java.util.ServiceLoader)动态加载实现,并且HadoopFsFactory在运行时(通过反射,如果它在类路径中找到 Hadoop)加载它然后用于创建FileSystem.

我面临的问题是RecoverableWriterHadoop 兼容包的默认值仅支持hdfs文件方案(我使用gs),因此在运行时抛出错误

所以,我(我调用extended了)和返回一个自定义实现,然后处理恢复的细节等,并创建了一个相应的类(该类用 装饰,因此应该可以被 发现)。HadoopFileSystemGCSFileSystem@overridedFileSystem#createRecoverableWriter()RecoverableWriterFileSystemFactory@AutoServiceServiceLoader

该设置在本地和本地 docker 集群上运行良好(实际上 GCS 连接器由于缺乏授权而引发错误,但这很好,因为这意味着FileSystem已加载并正在运行)但是当我将它部署到正在运行的 docker 集群时它会失败在谷歌计算引擎上。

在 GCE 上,默认值HadoopFileSystem被加载并按照方案 isgs而不是抛出异常hdfs,但我的假设是它应该已经加载了我的工厂实现,因此不应该出现这个错误。

我在 Flink v1.6.0上并使用docker-flink在 Docker 上运行长时间运行的会话集群

标签: javahadoopgoogle-cloud-storagegoogle-compute-engineapache-flink

解决方案


答案在 OP 的最后一行!!

我在一个长寿的 Session-cluster上运行,当我job.jar执行时,FileSystem初始化已经完成并且工厂已经加载!因此,当我添加我的作业时,没有进行初始化调用。

解决方案?根据您部署工作的方式,有几种方法:

  • 独立:FileSystem将包含实现的 jar 添加到lib/目录

  • Cluster ( manual):FileSystem将包含实现的 jar 添加到lib/您的zip或图像的目录或其他目录中。

  • Cluster ( docker)( long-living):创建自定义容器镜像并将 jar 添加到该lib/镜像的目录。

  • Cluster ( docker)( per-job-session):创建自定义容器映像并将所有 jars(包含FileSystem和您的作业等)添加到lib/目录中,在此处阅读有关每个作业会话的更多信息。


推荐阅读