首页 > 解决方案 > 按 yy/mm/dd 分区后的 Spark df partitioniong

问题描述

S3 托管一个非常大的压缩文件(20gb 压缩 -> 200gb 未压缩)。我想读取这个文件(不幸的是在单核上解压缩),转换一些 sql 列,然后以s3_path/year=2020/month=01/day=01/[files 1-200].parquet格式输出到 S3。

整个文件将包含同一日期的数据。这让我相信而不是使用partitionBy('year','month','day')我应该追加"year={year}/month={month}/day={day}/"到 s3 路径,因为目前 spark 一次将单个文件写入 s3(每个 1gb 大小)。我的想法正确吗?

这是我目前正在做的事情:

df = df\
    .withColumn('year', lit(datetime_object.year))\
    .withColumn('month', lit(datetime_object.month))\
    .withColumn('day', lit(datetime_object.day))

df\
    .write\
    .partitionBy('year','month','day')\
    .parquet(s3_dest_path, mode='overwrite')

我在想什么:

df = spark.read.format('json')\
    .load(s3_file, schema=StructType.fromJson(my_schema))\
    .repartition(200)
# currently takes a long time decompressing the 20gb s3_file.json.gz

# transform
df.write\
    .parquet(s3_dest_path + 'year={}/month={}/day={}/'.format(year,month,day))

标签: pythondataframeapache-spark

解决方案


您可能遇到了 spark 先将数据写入某个 _temporary 目录,然后才将其提交到最终位置的问题。在 HDFS 中,这是通过重命名来完成的。但是 S3 不支持重命名,而是完全复制数据(仅使用一个执行器)。有关此主题的更多信息,请参见例如这篇文章:来自 EMR/Spark 的 S3 写入速度极慢

常见的解决方法是写入 hdfs,然后使用 distcp 从 hdfs 复制分发到 s3


推荐阅读