首页 > 解决方案 > Spark 在执行器中解压缩并将 CSV 写入镶木地板

问题描述

我的问题是我的 CSV 文件采用 ZIP.csv.zip格式,所以我不能.csv像使用.csv.gzip | .csv.gz. 这意味着我需要解压缩文件,读取内容(文件很大~5gb)并将它们写为镶木地板文件。

我的方法是这样的:

String paths = "s3a://...,s3a://...,...";
JavaRDD<Tuple2<String, PortableDataStream>> zipRDD = context.binaryFiles(paths, sparkContext.context.defaultParallelism()).toJavaRDD();
JavaRDD<Tuple2<String, List<Row>>> filenameRowsRDD = zipRDD.flatMap(new ConvertLinesToRows());  

第一个JavaRDD返回一对Filename, InputStream. 然后将其传递给类ConvertLinesToRows,该类调用ZipInputStream读取 CSV 文件的内容,并为每一行创建一个新的火花Row,最后返回Filename, List<Row>列表包含从 CSV 转换为行的所有行的元组对。

我现在想将每个读取的 CSV 保存为parquet文件。

filenameRowsRDD.foreach(tuple -> {
    SparkContext newContext = MySparkConfig.createNewContext();
    SparkSession newSpark = SparkSession.builder()
        .sparkContext(newContext)
        .getOrCreate();

    Dataset<Row> dataset = newSpark.createDataFrame(tuple._2, datasetSchema.value());
    dataset.write().parquet("s3a://...");
});

我在我的执行程序中重新创建了 SparkSession,以便能够使用 SparkSession.write。

这个想法是这一切都将在执行程序中运行(我希望如此)。但是,使用这种方法,当执行程序想要写入此输出文件时,读取文件不是问题。抛出异常:A master URL must be set in your configuration

这似乎我正在做一些防火花的事情。它也不起作用。我也尝试过广播我的SparkSession但是SparkSession在尝试编写之前会在里面抛出一个 NPE。

  1. 在这里解决我的问题的正确方法是什么?
  2. 这样做的火花方式是什么。

以上所有代码都在我的main()方法中。我是否正确假设第一个zipRDDmaster节点上运行,第二个filenameRowsRDD在执行程序节点上运行以及.foreach.

标签: apache-sparkapache-spark-sql

解决方案


推荐阅读