首页 > 解决方案 > 从 spark 中保存压缩的 json

问题描述

从 Spark RDDs,我想暂存 JSON 数据并将其存档到 AWS S3。压缩它才有意义,而且我有一个使用 hadoop 的进程GzipCodec,但是有些事情让我对此感到紧张。

当我查看org.apache.spark.rdd.RDD.saveAsTextFile这里的类型签名时:

https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.rdd.RDD

类型签名是:

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

但是当我在这里检查可用的压缩编解码器时:

https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.io.CompressionCodec

父特征CompressionCodec和子类型都说:

编解码器的有线协议不能保证跨版本的 Spark 兼容。这旨在用作单个 Spark 应用程序中的内部压缩实用程序

这不好......但没关系,因为 gzip 可能更容易跨生态系统处理。

类型签名说编解码器必须是CompressionCodec...的子类型,但我尝试了以下保存为 .gz,它工作正常,即使 hadoop 的 GzipCodec 不是<: CompressionCodec

import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsTextFile(bucketName, classOf[GzipCodec])

我的问题:

标签: jsonscalaapache-sparkhadoopgzip

解决方案


好吧,对于初学者来说,您是绑定到 RDD 还是可以使用 DataSets/DataFrames ?

使用 DataFrames,您可以使用类似的东西

 df.write.format("json").
    option("compression", "org.apache.hadoop.io.compress.GzipCodec").
    save("...")

但是,有一些注意事项。压缩很棒,但是如果你生成的文件很大,你必须记住 gzip 不是一种可拆分的格式,也就是说,如果你想稍后处理那个文件,它必须被一个人读取工人。例如,如果您的文件是不可拆分的并且为 1G,则需要 T 时间来处理,如果它是可拆分的(如 LZO、Snappy 或 BZip2),则可以在 T/N 中处理它,其中 N 是拆分的数量(假设 128MB 块,大约是 8 个)。这就是 Hadoop 使用 SequenceFiles(可拆分,并在一个块内使用 gzip)的原因,这也是为什么存储到 S3 时选择的压缩格式通常是 Parquet 的原因。Parquet 文件比 Gzipped 文件小,并且是可拆分的,也就是说,它的内容可以由多个工作人员处理。

归根结底,这实际上取决于您打算如何处理 S3 中的数据。

会被查询吗?在这种情况下,Parquet 是更好的格式选择。

它会被读取/复制到其他不理解 parquet 的系统吗?然后gzip压缩就OK了。而且它很稳定,您不必担心它会发生变化。您可以自己尝试,在 S3 上保存一些示例数据,您仍然可以使用任何 gzip 工具打开它。


推荐阅读