首页 > 解决方案 > Apache flink - 阅读后将文件移动到不同的文件夹

问题描述

我正在从目录中读取 csv 文件并进行一些处理。现在 flink 只是选择该目录中的任何新文件并对其进行处理。这对我来说很好。

我陷入了两个问题:

  1. 我想记录 flink 已完成处理的文件名。
  2. 一旦 flink 完成处理,我想将处理后的文件移动到不同的文件夹。

我的代码片段是:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(feedFileFolderPath);

RowCsvInputFormat format = new RowCsvInputFormat(filePath, FetchTypeInformation.getTypeInformation());

DataStream<Row> inputStream = env.readFile(format, feedFileFolderPath, FileProcessingMode.PROCESS_CONTINUOUSLY,
                parseInt(folderLookupTime));

标签: apache-flink

解决方案


这个话题已经在 flink 邮件列表中出现过几次——请参阅此处此处的讨论——但简短的总结是,在 Flink 中还没有一种简单的方法可以做到这一点。

似乎通常做的是使用 cron 作业定期将旧文件移出被监视的目录,假设它们已被处理。如果你想比这更小心,那么你必须实现自己的机制来跟踪处理工作的进度。上面提到的电子邮件线程包括一些关于如何做到这一点的想法。


推荐阅读