首页 > 解决方案 > Java Spark - 根据列数过滤RDD中的记录问题

问题描述

这是一个不同的问题,我试图根据列数过滤 RDD 中的记录。这更像是文件处理。

我在 Pyspark 中写过同样的内容,我看到记录正在正确过滤。当我在 Java 中尝试时,有效记录将进入错误文件夹。

下载错误文件并使用 AWK 进行验证,发现它们有 996 列,但仍然在错误中被过滤掉。

在 python 中,过滤的文件的确切数量是错误文件。

下面是片段。


JavaRDD<String> inputDataRDD = sc.textFile(args[0]+"/"+args[1], 5000);

int columnLength = Integer.parseInt(args[3]);

inputDataRDD
.filter(filterData -> filterData.split("\t").length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);

inputDataRDD
.filter(filterData -> filterData.split("\t").length != columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Error/", GzipCodec.class);

片段结束。。

该文件中有近 10M 条记录。

Java和Python之间的sc.textfile(文件名,int numPartitions)是否有任何区别,或者我错过了什么。

需要你的帮助来找出我犯的错误。

注意:-使用 eclipse 构建了一个 maven 并在 Yarn 中运行了以下命令。

spark-submit --class com.virtualpairprogrammers.ProcessFilesToHDFS --master yarn learningSpark-0.0.1-SNAPSHOT.jar "/input/ABFeeds/" "ABFeeds_2020-04-20.tsv.gz" "/output/ABFeeds/2020-05-06/" 996

提前致谢

问候

山姆

标签: javaapache-sparkrdd

解决方案


问题在于我使用的拆分命令。问题:- 当最后一列为空时,Java 拆分无法将其视为一列。我参考了以下网站,该网站讨论了拆分问题

Java String split 删除了空值

旧片段:

inputDataRDD
    .filter(filterData -> filterData.split("\t").length == columnLength)
    .coalesce(1)
    .saveAsTextFile(args[2]+"Valid/", GzipCodec.class);

修改片段:

inputDataRDD
    .filter(filterData -> filterData.split("\t",-1).length == columnLength)
    .coalesce(1)
    .saveAsTextFile(args[2]+"Valid/", GzipCodec.class);

我已经测试过它并且它正在工作。

感谢大家的帮助。

问候


推荐阅读