scala - Which option to choose for writing CSV file in Spark (HDFS)?
问题描述
I have to compare CSV files and then I have to delete all duplicate rows. So, my condition is like I have one folder and I have to put every filtered result in that folder, and when some new file will come I have to compare the existing files in the folder with the new one and finally, I have to put back the result to the same folder.
eg: /data/ingestion/file1.csv a1 b1 c1 a2 b2 c2 a3 b3 c3 /data/ingestion/file2.csv a4 b4 c4 a5 b5 c5 a6 b6 c6 new upcoming file(upcoming_file.csv): a1 b1 c1 a5 b5 c5 a7 b7 c7
Now my approach is creating one dataframe from all the files present in /data/ingestion/*. Then creating one dataframe of upcoming_file.csv and appending both of them using union operation. Finally, applying distinct transformation. Now I have to write it back to /data/ingestion making sure that no duplicity will be there. So, I choose overwrite operation.
deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion/")
Then I finally end up deleting everything inside the folder /data/ingestion. Even the new dataframe is not getting written in CSV files.
I have tried other options also but I have not achieved what I have explained above!
Thanks in advance!
解决方案
我建议将输出写入 hdfs 上的新目录 - 如果处理失败,您将始终能够丢弃任何已处理的内容并使用原始数据从头开始处理 - 这既安全又简单。:)
处理完成后 - 只需删除旧的并将新的重命名为旧的名称。
更新:
deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion_tmp/")
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf);
hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive);
hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");
这是 HDFS FileSystem API 文档的链接
推荐阅读
- matlab - 具有向量输入的 Simulink MATLAB 功能块
- linq - 我更改了一些表,更新了 EDMX 模型,一些表“丢失”了
- verilog - 如何在verilog中使用触发信号在N个周期后将信号设置为高电平?
- kubernetes - io.k8s.api.core.v1.ContainerPort.containerPort 的 Kubernetes ValidationError 无效类型:得到“字符串”,预期“整数”;
- git - 当我取消暂存时文件消失了,为什么?
- java - 如何使用 arrayAdapter 的意图进入第三个活动?
- react-native - 添加redux-persist后应用打开后白闪白屏死机
- android - 两个相似的代码结构,但一个不起作用
- java - JSTL 迭代列表
- java - 存储密码以传递给另一个服务