apache-flink - 根据 Flink 中的模式使用 GCS 文件
问题描述
由于 Flink 支持 Hadoop FileSystem 抽象,并且有一个GCS 连接器- 在 Google Cloud Storage 之上实现它的库。
如何使用此 repo 中的代码创建 Flink 文件源?
解决方案
为此,您需要:
- 在 Flink 集群上安装和配置GCS 连接器。
- 将 Hadoop 和 Flink 依赖项(包括HDFS 连接器)添加到您的项目中:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency>
使用它来创建具有 GCS 路径的数据源:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.TextInputFormat; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<LongWritable, Text>> input = env.createInput( HadoopInputs.readHadoopFile( new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
推荐阅读
- linux - 如何修改行首有字符的模式
- c - 这个 memcpy 会导致未定义的行为吗?
- laravel - laravel & vuejs - 你是否正确注册了组件?对于递归组件,请确保提供“名称”选项。在发现
- r - ggplot 的每个方面都有不同的 `geom_hline()`
- dc.js - Crossfilter 使用具有嵌套对象的数据创建维度
- java - maven在哪里安装jar?
- java - 如何打印格式正确的字符串
- android - 在显式尝试的同时启动隐式意图并完成活动
- r - 删除 ggarrange 中的空白,ggplot2
- python - Django CharField 添加到前端的选择