首页 > 解决方案 > 如何在 Apache Flink 的每次循环迭代中写入文件?

问题描述

我是 Apache Flink 的新手。我必须过滤计算机中的所有文件并将它们写入一个文件。但在 Flink 中,似乎不可能写入现有文件。我该如何处理?

标签: javaapache-flink

解决方案


您想要做的是使用所有文件作为 Flink 工作流的源,而不是在迭代中一次处理一个文件。通常,您通过向 Hadoop 作业配置添加路径来执行此操作,例如,这是我使用读取序列(二进制)文件的代码中的示例。

        Job job = Job.getInstance();
        FileInputFormat.addInputPath(job, new Path(options.getCrawlDir()));

        HadoopInputFormat<Tuple, Tuple> inputFormat =  HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(), 
                Tuple.class, Tuple.class, job);

        DataSet<HomePageText> homePageData =  env.createInput(inputFormat)

推荐阅读