首页 > 解决方案 > 保存 Spark Java RDD,以便将每个 RDD 值保存到单独文件夹中的单独文件中

问题描述

我正在使用带有 Java 1.8 的 Spark 2.3

我有一个 CSV 记录的 RDD 说:

JavaRDD<CsvRecordsPerApp> csvRecordsRdd

这里每个CsvRecordsPerApp都有多个值:

class CsvRecordsPerApp implements Serializable {
    String customerName;
    String supplierName;
    String otherFieldName;
} 

我想将它保存在多个文件夹中,以便每个 RDD 保存到 3 个单独的文件夹中,例如

- customerNames\part-0000
- customerNames\part-0001
...
- supplierNames\part-0000
- supplierNames\part-0001
...

- otherFieldNames\part-0000
- otherFieldNames\part-0001
...

但是当我在下面执行时,它将所有输出文件保存在单个文件中:

JavaRDD<CsvRecordsPerApp> csvRecordsRdd = ...
csvRecordsRdd.saveAsTextFile("file-name");

喜欢:

file-name/0000
file-name/0001
..

我尝试将 csvRecordsRdd 映射到不同的值并保存 3 次,如下所示:

JavaRDD<String> customerNameRdd = csvRecordsRdd.map(csv -> csv.getCustomerName());
customerNameRdd.saveAsTextFile("customerNames");

JavaRDD<String> supplierNameRdd = csvRecordsRdd.map(csv -> csv.getSupplierName());
supplierNameRdd.saveAsTextFile("supplierNames");

JavaRDD<String> otherFieldNameRdd = csvRecordsRdd.map(csv -> csv.getOtherFieldName());
otherFieldNameRdd.saveAsTextFile("otherFieldName");

这里的问题是它重新计算 RDD 3 次,我有三个条目!

然后停止重新计算,我尝试了下面的缓存,但它没有工作,仍然计算 3 次:

csvRecordsRdd.persist(StorageLevel.MEMORY_AND_DISK()); or csvRecordsRdd.cache();

我正在寻找解决问题的想法

标签: javafileapache-sparkrdd

解决方案


这里缓存的解决方案有效(对不起,我忘了早点更新)。

因为我将其他配置,如 spart-submit 驱动程序执行程序内存从 1 gb(默认)更改为 20 gb 左右(取决于您的系统可用性,例如在我的桌面上,我将其增加到 5 gb,但在 EMR 上,我将其增加到 20 gb 或更多)。

我认为这只是一种解决方法,因为它会缓存对象。缓存有一个限制,因此对于更大的数据它可能会失败,并且肯定需要更大的 m/c。

所以,请提出更多更好的解决方案。


推荐阅读