首页 > 解决方案 > Flink 数据源迭代

问题描述

我正在尝试迭代数据源:


     val env = ExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)

      val job = Job.getInstance
      FileInputFormat.addInputPath(
        job,
        new Path("file.parquet.gz")
      )

      val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
        new HadoopInputFormat(
          new AvroParquetInputFormat[GenericRecord],
          classOf[Void],
          classOf[GenericRecord],
          job
        )
       val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)

当我执行 data.print 时,我可以看到元组中的数据。

但是当我这样做时:


    data.map
     {
       res =>
         println("!!!!!!!!!!!111")
         println( res.f1)
     }

没有任何东西被打印出来。

我想迭代数据源并获取 GenericRecord。请帮我。

标签: hadoopdatasourceapache-flinkflink-streaming

解决方案


为了在不调用printor的情况下执行 Flink 批处理程序collect,您需要调用env.execute(). 在没有上述 API 调用的情况下,只有这个调用会触发程序的执行。


推荐阅读