java - 如何在 Flink 中为 Google Cloud Storage 创建 RecoverableWriter
问题描述
我想使用谷歌云存储从我的流媒体作业中写入(接收)元素。DataStream
StreamingFileSink
为此,我使用Hadoop 的Google Cloud Storage 连接器作为 的实现,org.apache.hadoop.fs.FileSystem
并用作包装 Flink 的hadoop FileSystem类的实现。HadoopFileSystem
org.apache.flink.core.fs.FileSystem
我在我的 gradle 文件中包含了以下依赖项:
compile(
"com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2"
)
compile(
"org.apache.flink:flink-connector-filesystem_2.11:1.6.0"
)
provided(
"org.apache.flink:flink-shaded-hadoop2:1.6.0"
)
现在,根据我对源[1] [2] [3]FileSystemFactory
的理解,Flink在运行时(通过java.util.ServiceLoader
)动态加载实现,并且HadoopFsFactory
在运行时(通过反射,如果它在类路径中找到 Hadoop)加载它然后用于创建FileSystem
.
我面临的问题是RecoverableWriter
Hadoop 兼容包的默认值仅支持hdfs
文件方案(我使用gs
),因此在运行时抛出错误。
所以,我(我调用extended
了)和返回一个自定义实现,然后处理恢复的细节等,并创建了一个相应的类(该类用 装饰,因此应该可以被 发现)。HadoopFileSystem
GCSFileSystem
@overrided
FileSystem#createRecoverableWriter()
RecoverableWriter
FileSystemFactory
@AutoService
ServiceLoader
该设置在本地和本地 docker 集群上运行良好(实际上 GCS 连接器由于缺乏授权而引发错误,但这很好,因为这意味着FileSystem
已加载并正在运行)但是当我将它部署到正在运行的 docker 集群时它会失败在谷歌计算引擎上。
在 GCE 上,默认值HadoopFileSystem
被加载并按照方案 isgs
而不是抛出异常hdfs
,但我的假设是它应该已经加载了我的工厂实现,因此不应该出现这个错误。
我在 Flink v1.6.0上并使用docker-flink在 Docker 上运行长时间运行的会话集群
解决方案
答案在 OP 的最后一行!!
我在一个长寿的 Session-cluster上运行,当我job.jar
执行时,FileSystem
初始化已经完成并且工厂已经加载!因此,当我添加我的作业时,没有进行初始化调用。
解决方案?根据您部署工作的方式,有几种方法:
独立:
FileSystem
将包含实现的 jar 添加到lib/
目录Cluster (
manual
):FileSystem
将包含实现的 jar 添加到lib/
您的zip
或图像的目录或其他目录中。Cluster (
docker
)(long-living
):创建自定义容器镜像并将 jar 添加到该lib/
镜像的目录。Cluster (
docker
)(per-job-session
):创建自定义容器映像并将所有 jars(包含FileSystem
和您的作业等)添加到lib/
目录中,在此处阅读有关每个作业会话的更多信息。
推荐阅读
- reactjs - 如何在输入标签(这是一个组件)中处理“autoFocus”和“required”属性?
- vim - 如何在 vimrc 文件中对 vim 函数进行单元测试?
- mysql - Shopware 6开发版安装错误
- javascript - 无法从不同的 WebApp(Tomcat)获取图像/JS
- javascript - 如何在画布上突出显示图像的一部分?
- python - 从虚拟机下载档案需要太多时间才能开始
- ionic-framework - Ionic QR Scanner returns previous value
- javascript - 以编程方式创建选项的更多数据
- python - 为什么单列会导致我的 SVM 需要一个小时?
- android - 无法在 Parse-server 上打开文件