首页 > 解决方案 > 使用 Spark RDD 保存和加载整个文本文件

问题描述

我需要在 spark 中对一些文本文件进行批处理。基本上有人给了我大量畸形的 csv 文件。它们包含多行任意文本格式的标题数据,然后是多行格式正确的 csv 数据。我需要将此数据拆分为两个文件,或者至少以某种方式摆脱标题。

无论如何,我读过你可以得到一个格式如下的 RDD:

[(文件名,内容)]

通过使用

火花 \ .sparkContext \ .wholeTextFiles(input_files_csv)

然后我想在这个 RDD 上执行映射操作,这会导致另一种格式与原始格式完全相同

[(新文件名,内容)]

然后我希望集群将这些内容保存在这些文件名下。

我找不到可以为我执行此操作的写入命令。我可以保存 RDD raw,但我不能将它保存为普通文件,然后我可以将其作为数据帧读取。

我想我可以删除标题,然后将文件名另存为一个巨大的 csv 作为新列,但我觉得这样不会那么有效。

有没有人可以解决我的问题?

标签: apache-sparkpysparkrdd

解决方案


这是 Scala,但它在 Python 中应该不会太远。在“foreach”中,我没有使用任何特定于 spark 的东西来编写文件,只是使用常规的 Hadoop API。

sc.wholeTextFiles("/tmp/test-data/")
  .foreach{ x =>
    val filename = x._1
    val content = x._2
    val fs = FileSystem.get(new Configuration())
    val output = fs.create(new Path(s"${filename}-copy"))
    val writer = new PrintWriter(output)
    writer.write(content)
    writer.close
  }

推荐阅读