java - 如何在 Apache Flink 的每次循环迭代中写入文件?
问题描述
我是 Apache Flink 的新手。我必须过滤计算机中的所有文件并将它们写入一个文件。但在 Flink 中,似乎不可能写入现有文件。我该如何处理?
解决方案
您想要做的是使用所有文件作为 Flink 工作流的源,而不是在迭代中一次处理一个文件。通常,您通过向 Hadoop 作业配置添加路径来执行此操作,例如,这是我使用读取序列(二进制)文件的代码中的示例。
Job job = Job.getInstance();
FileInputFormat.addInputPath(job, new Path(options.getCrawlDir()));
HadoopInputFormat<Tuple, Tuple> inputFormat = HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(),
Tuple.class, Tuple.class, job);
DataSet<HomePageText> homePageData = env.createInput(inputFormat)
推荐阅读
- android - android 离线模式 - LRU 缓存或 Realm 数据库
- javascript - Nodejs等待socket然后响应http请求
- testing - 由于大量数据通过 Api 发布到数据库,Jmeter -GUI 在录制时挂起
- php - 我的简单 html php 表单没有插入数据库
- c# - 配置管理器的问题
- python-3.x - 散景 openStreetMap 磁贴在所有浏览器中不可见
- datafeed - RSA Archer 中的 Archer 到 Archer 数据馈送
- android - 安卓视频镜像效果
- azure - VSTS Azure App Service Deploy 发布任务不更新文件
- c++ - vcxproj 项目属性中未显示的附加包含路径