apache-spark - Spark 在执行器中解压缩并将 CSV 写入镶木地板
问题描述
我的问题是我的 CSV 文件采用 ZIP.csv.zip
格式,所以我不能.csv
像使用.csv.gzip | .csv.gz
. 这意味着我需要解压缩文件,读取内容(文件很大~5gb)并将它们写为镶木地板文件。
我的方法是这样的:
String paths = "s3a://...,s3a://...,...";
JavaRDD<Tuple2<String, PortableDataStream>> zipRDD = context.binaryFiles(paths, sparkContext.context.defaultParallelism()).toJavaRDD();
JavaRDD<Tuple2<String, List<Row>>> filenameRowsRDD = zipRDD.flatMap(new ConvertLinesToRows());
第一个JavaRDD
返回一对Filename, InputStream
. 然后将其传递给类ConvertLinesToRows
,该类调用ZipInputStream
读取 CSV 文件的内容,并为每一行创建一个新的火花Row
,最后返回Filename, List<Row>
列表包含从 CSV 转换为行的所有行的元组对。
我现在想将每个读取的 CSV 保存为parquet
文件。
filenameRowsRDD.foreach(tuple -> {
SparkContext newContext = MySparkConfig.createNewContext();
SparkSession newSpark = SparkSession.builder()
.sparkContext(newContext)
.getOrCreate();
Dataset<Row> dataset = newSpark.createDataFrame(tuple._2, datasetSchema.value());
dataset.write().parquet("s3a://...");
});
我在我的执行程序中重新创建了 SparkSession,以便能够使用 SparkSession.write。
这个想法是这一切都将在执行程序中运行(我希望如此)。但是,使用这种方法,当执行程序想要写入此输出文件时,读取文件不是问题。抛出异常:A master URL must be set in your configuration
。
这似乎我正在做一些防火花的事情。它也不起作用。我也尝试过广播我的SparkSession
但是SparkSession
在尝试编写之前会在里面抛出一个 NPE。
- 在这里解决我的问题的正确方法是什么?
- 这样做的火花方式是什么。
以上所有代码都在我的main()
方法中。我是否正确假设第一个zipRDD
在master
节点上运行,第二个filenameRowsRDD
在执行程序节点上运行以及.foreach
.
解决方案
推荐阅读
- google-chrome-extension - 使用 Chrome 扩展程序根据标题阻止网站
- mysql - MySQL:添加以前和当前作为字段“学年”的默认值
- crc - 是否存在已知擅长检测位滑的 CRC 多项式?
- html - 在 Textarea 上打开 Chrome 自动完成功能
- google-sheets - 对付款类型进行分组并汇总总和
- wordpress - WordPress:无法安装或更新插件
- sql - 如何在每月数据计数中包含计数为 0 的月份名称?
- java - 如何检测文件是否已重命名?
- java - Apache Commons IO 仅下载第一个 PDF 页面
- ios - AudioKit 将所有音频录制到文件中