java - Java Spark - 根据列数过滤RDD中的记录问题
问题描述
这是一个不同的问题,我试图根据列数过滤 RDD 中的记录。这更像是文件处理。
我在 Pyspark 中写过同样的内容,我看到记录正在正确过滤。当我在 Java 中尝试时,有效记录将进入错误文件夹。
下载错误文件并使用 AWK 进行验证,发现它们有 996 列,但仍然在错误中被过滤掉。
在 python 中,过滤的文件的确切数量是错误文件。
下面是片段。
JavaRDD<String> inputDataRDD = sc.textFile(args[0]+"/"+args[1], 5000);
int columnLength = Integer.parseInt(args[3]);
inputDataRDD
.filter(filterData -> filterData.split("\t").length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
inputDataRDD
.filter(filterData -> filterData.split("\t").length != columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Error/", GzipCodec.class);
片段结束。。
该文件中有近 10M 条记录。
Java和Python之间的sc.textfile(文件名,int numPartitions)是否有任何区别,或者我错过了什么。
需要你的帮助来找出我犯的错误。
注意:-使用 eclipse 构建了一个 maven 并在 Yarn 中运行了以下命令。
spark-submit --class com.virtualpairprogrammers.ProcessFilesToHDFS --master yarn learningSpark-0.0.1-SNAPSHOT.jar "/input/ABFeeds/" "ABFeeds_2020-04-20.tsv.gz" "/output/ABFeeds/2020-05-06/" 996
提前致谢
问候
山姆
解决方案
问题在于我使用的拆分命令。问题:- 当最后一列为空时,Java 拆分无法将其视为一列。我参考了以下网站,该网站讨论了拆分问题
旧片段:
inputDataRDD
.filter(filterData -> filterData.split("\t").length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
修改片段:
inputDataRDD
.filter(filterData -> filterData.split("\t",-1).length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
我已经测试过它并且它正在工作。
感谢大家的帮助。
问候
推荐阅读
- java - oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:513)
- git - 同事推后变基怎么办?
- python - 使用 python 获取所有视频的持续时间
- python - 如何使用人脸级联自定义人脸检测参数
- javascript - 从流类型创建一个数组?
- flexbox - 图标下的文本,flexbox
- python - 如何在自定义变压器中访问 sklearn 列变压器
- prometheus - 如何从 Prometheus 中的相同指标中提取不同的过去值?
- javascript - 如何使用 JavaScript 或 jQuery 进行 json 格式化
- javascript - 带有列表项菜单的代码 javascript 有什么问题?