首页 > 解决方案 > Which option to choose for writing CSV file in Spark (HDFS)?

问题描述

I have to compare CSV files and then I have to delete all duplicate rows. So, my condition is like I have one folder and I have to put every filtered result in that folder, and when some new file will come I have to compare the existing files in the folder with the new one and finally, I have to put back the result to the same folder.

eg: /data/ingestion/file1.csv

   a1 b1 c1

   a2 b2 c2

   a3 b3 c3

/data/ingestion/file2.csv

   a4 b4 c4

   a5 b5 c5

   a6 b6 c6

new upcoming file(upcoming_file.csv):

   a1 b1 c1

   a5 b5 c5

   a7 b7 c7

Now my approach is creating one dataframe from all the files present in /data/ingestion/*. Then creating one dataframe of upcoming_file.csv and appending both of them using union operation. Finally, applying distinct transformation. Now I have to write it back to /data/ingestion making sure that no duplicity will be there. So, I choose overwrite operation.

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion/")

Then I finally end up deleting everything inside the folder /data/ingestion. Even the new dataframe is not getting written in CSV files.

I have tried other options also but I have not achieved what I have explained above!

Thanks in advance!

标签: scalaapache-sparkhadoopdataframehdfs

解决方案


我建议将输出写入 hdfs 上的新目录 - 如果处理失败,您将始终能够丢弃任何已处理的内容并使用原始数据从头开始处理 - 这既安全又简单。:)

处理完成后 - 只需删除旧的并将新的重命名为旧的名称。

更新:

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion_tmp/")

   Configuration conf = new Configuration();
    conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
    FileSystem  hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf);
    hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive);
    hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");

是 HDFS FileSystem API 文档的链接


推荐阅读