首页 > 解决方案 > 根据 Flink 中的模式使用 GCS 文件

问题描述

由于 Flink 支持 Hadoop FileSystem 抽象,并且有一个GCS 连接器- 在 Google Cloud Storage 之上实现它的库。

如何使用此 repo 中的代码创建 Flink 文件源?

标签: apache-flinkgoogle-cloud-dataproc

解决方案


为此,您需要:

  1. 在 Flink 集群上安装和配置GCS 连接器。
  2. 将 Hadoop 和 Flink 依赖项(包括HDFS 连接器)添加到您的项目中:
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
        <scope>provided</scope>
    </dependency>
    
  3. 使用它来创建具有 GCS 路径的数据源:

    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.hadoopcompatibility.HadoopInputs;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.TextInputFormat;
    
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<LongWritable, Text>> input =
        env.createInput(
            HadoopInputs.readHadoopFile(
                new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
    

推荐阅读