首页 > 解决方案 > 使用 Pyspark 将文件从一个目录移动到 HDFS 中的另一个目录

问题描述

我正在尝试从一个目录中读取所有 JSON 文件的数据,并使用下面的代码将它们存储在 Spark Dataframe 中。(它工作正常)

spark = SparkSession.builder.getOrCreate()


df = spark.read.json("hdfs:///user/temp/backup_data/st_in_*/*/*.json",multiLine=True)

但是当我尝试用多个文件保存 DataFrame 时,使用下面的代码

df.write.json("hdfs:///user/another_dir/to_save_dir/")

它没有按预期存储文件并抛出to_save_dir已经存在的错误

我只想保存文件,就像我从源目录读取到目标目录一样。

编辑:

问题是,当我读取多个文件并想将其写入一个目录时,Pyspark 中的程序是什么?我问这个的原因是因为一旦火花加载了所有文件,它就会创建一个数据帧,并且每个文件都是这个数据帧中的一行,我应该如何继续为数据帧中的每一行创建新文件

标签: pythonapache-sparkpysparkhdfs

解决方案


您得到的错误很清楚,您尝试写入的位置似乎已经存在。您可以通过指定来选择覆盖它mode

df.write.mode("overwrite").json("hdfs:///user/another_dir/to_save_dir/")

但是,如果您的意图只是在 HDFS 中将文件从一个位置移动到另一个位置,则无需在 Spark 中读取文件然后再写入它们。相反,请尝试使用Hadoop FS API

conf = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileUtil = sc._gateway.jvm.org.apache.hadoop.fs.FileUtil

src_path = Path(src_folder)
dest_path = Path(dest_folder)

FileUtil.copy(src_path.getFileSystem(conf), 
              src_path,
              dest_path.getFileSystem(conf),
              dest_path,
              True,
              conf)

推荐阅读