hadoop - Flink 数据源迭代
问题描述
我正在尝试迭代数据源:
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val job = Job.getInstance
FileInputFormat.addInputPath(
job,
new Path("file.parquet.gz")
)
val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
new HadoopInputFormat(
new AvroParquetInputFormat[GenericRecord],
classOf[Void],
classOf[GenericRecord],
job
)
val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)
当我执行 data.print 时,我可以看到元组中的数据。
但是当我这样做时:
data.map
{
res =>
println("!!!!!!!!!!!111")
println( res.f1)
}
没有任何东西被打印出来。
我想迭代数据源并获取 GenericRecord。请帮我。
解决方案
为了在不调用print
or的情况下执行 Flink 批处理程序collect
,您需要调用env.execute()
. 在没有上述 API 调用的情况下,只有这个调用会触发程序的执行。
推荐阅读
- python - 使用 python,我需要每天使用 MongoDB api 将文档从 Azure CosmosDB 传输到 Postrgres,可能使用 Azure 函数
- javascript - 跟随光标位置将图像添加到 DOM
- python - 在通过 gerrit-python-api 包采摘樱桃时确定其是合并冲突还是相同的树?
- excel - Excel Power Query 合并和转换列
- python - 根据另一个数组的元素从 numpy 数组中删除元素
- javascript - 指定replicationRegions时的Javascript NestedStack CREATE_FAILED
- react-native - UI Kitten 中的 Top Navigation 组件如何在 React Native Stack Navigator 中用作自定义标题
- python - 字典更新序列元素#0的长度为0;2 是必需的
- java - 缺少工件 org.springframework.boot:spring-boot-starter-data-jpa:jar:2.4.5Java(0)
- sql - T-SQL 从表中删除交叉对