apache-spark - 使用 Spark RDD 保存和加载整个文本文件
问题描述
我需要在 spark 中对一些文本文件进行批处理。基本上有人给了我大量畸形的 csv 文件。它们包含多行任意文本格式的标题数据,然后是多行格式正确的 csv 数据。我需要将此数据拆分为两个文件,或者至少以某种方式摆脱标题。
无论如何,我读过你可以得到一个格式如下的 RDD:
[(文件名,内容)]
通过使用
火花 \ .sparkContext \ .wholeTextFiles(input_files_csv)
然后我想在这个 RDD 上执行映射操作,这会导致另一种格式与原始格式完全相同
[(新文件名,内容)]
然后我希望集群将这些内容保存在这些文件名下。
我找不到可以为我执行此操作的写入命令。我可以保存 RDD raw,但我不能将它保存为普通文件,然后我可以将其作为数据帧读取。
我想我可以删除标题,然后将文件名另存为一个巨大的 csv 作为新列,但我觉得这样不会那么有效。
有没有人可以解决我的问题?
解决方案
这是 Scala,但它在 Python 中应该不会太远。在“foreach”中,我没有使用任何特定于 spark 的东西来编写文件,只是使用常规的 Hadoop API。
sc.wholeTextFiles("/tmp/test-data/")
.foreach{ x =>
val filename = x._1
val content = x._2
val fs = FileSystem.get(new Configuration())
val output = fs.create(new Path(s"${filename}-copy"))
val writer = new PrintWriter(output)
writer.write(content)
writer.close
}
推荐阅读
- c# - ASP.NET Web API 将图像上传到 SQL Server 数据库
- java - 使用 Base32 编码和解码字符串
- arrays - 有没有办法在VBA中总结数组的指定段落
- linux - 如何从 bzimage(自定义内核)创建 uefi 可启动映像
- linux - shell脚本中的成绩计算
- nginx - 我们如何将请求的请求参数与 Nginx 中的字符串数组进行比较
- php - 将 web 应用程序转换为 android 和 ios 应用程序
- angular - Angular 4 - 限制特定范围的用户输入
- configuration-files - linux mint 面板小程序配置文件在哪里?
- yii2 - mkdir(): 文件存在 (Yii2)