首页 > 解决方案 > 将大文件写入 S3 的最佳方法是什么?

问题描述

我正在使用 zeppelin 和 spark,我想从 S3 获取一个 2TB 文件并在 Spark 中对其进行转换,然后将其发送到 S3,以便我可以在 Jupyter notebook 中使用该文件。转换非常简单。

我正在将文件作为镶木地板文件读取。我认为它大约是 2TB,但我不确定如何验证。

它大约有 10M 行和 5 列,所以它非常大。

我试着做my_table.write.parquet(s3path),我试过my_table.write.option("maxRecordsPerFile", 200000).parquet(s3path)。我如何想出编写大拼花文件的正确方法?

标签: scalaapache-sparkpysparkparquetapache-zeppelin

解决方案


这些是你可以考虑的点...

1) maxRecordsPerFile 设置:

my_table.write.parquet(s3path)

Spark 为每个任务写入一个文件。

保存的文件数=正在保存的RDD的分区数/Dataframe。因此,这可能会导致文件大得离谱(当然,您可以重新分区数据并保存重新分区意味着跨网络打乱数据。)。

限制每个文件的记录数

my_table.write.option("maxRecordsPerFile", numberOfRecordsPerFile..yourwish).parquet(s3path)

它可以避免生成巨大的文件。

2) 如果您使用的是 AWS Emr (Emrfs),这可能是您可以考虑的要点之一。

emr-spark-s3-optimized-committer

当 EMRFS S3-optimized Committer 未使用时:

  • 使用 S3A 文件系统时。
  • 使用 Parquet 以外的输出格式时,例如 ORC 或文本。

3)使用压缩技术,算法版本和其他火花配置:

.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2)
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", true)
.config("spark.hadoop.parquet.enable.summary-metadata", false)
.config("spark.sql.parquet.mergeSchema", false)
.config("spark.sql.parquet.filterPushdown", true) // for reading purpose 
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.sql.parquet.compression.codec", "snappy")
.getOrCreate()

4)快速上传和其他道具,以防您使用 s3a:

  .config("spark.hadoop.fs.s3a.fast.upload","true")
  .config("spark.hadoop.fs.s3a.fast.upload","true")
  .config("spark.hadoop.fs.s3a.connection.timeout","100000")
  .config("spark.hadoop.fs.s3a.attempts.maximum","10")
  .config("spark.hadoop.fs.s3a.fast.upload","true")
  .config("spark.hadoop.fs.s3a.fast.upload.buffer","bytebuffer")
  .config("spark.hadoop.fs.s3a.fast.upload.active.blocks","4")
  .config("fs.s3a.connection.ssl.enabled", "true")

推荐阅读