python - 按 yy/mm/dd 分区后的 Spark df partitioniong
问题描述
S3 托管一个非常大的压缩文件(20gb 压缩 -> 200gb 未压缩)。我想读取这个文件(不幸的是在单核上解压缩),转换一些 sql 列,然后以s3_path/year=2020/month=01/day=01/[files 1-200].parquet
格式输出到 S3。
整个文件将包含同一日期的数据。这让我相信而不是使用partitionBy('year','month','day')
我应该追加"year={year}/month={month}/day={day}/"
到 s3 路径,因为目前 spark 一次将单个文件写入 s3(每个 1gb 大小)。我的想法正确吗?
这是我目前正在做的事情:
df = df\
.withColumn('year', lit(datetime_object.year))\
.withColumn('month', lit(datetime_object.month))\
.withColumn('day', lit(datetime_object.day))
df\
.write\
.partitionBy('year','month','day')\
.parquet(s3_dest_path, mode='overwrite')
我在想什么:
df = spark.read.format('json')\
.load(s3_file, schema=StructType.fromJson(my_schema))\
.repartition(200)
# currently takes a long time decompressing the 20gb s3_file.json.gz
# transform
df.write\
.parquet(s3_dest_path + 'year={}/month={}/day={}/'.format(year,month,day))
解决方案
您可能遇到了 spark 先将数据写入某个 _temporary 目录,然后才将其提交到最终位置的问题。在 HDFS 中,这是通过重命名来完成的。但是 S3 不支持重命名,而是完全复制数据(仅使用一个执行器)。有关此主题的更多信息,请参见例如这篇文章:来自 EMR/Spark 的 S3 写入速度极慢
常见的解决方法是写入 hdfs,然后使用 distcp 从 hdfs 复制分发到 s3