首页 > 解决方案 > PySpark 数据从非分区 hive 表加载到分区 hive 表的性能调整

问题描述

我们需要通过 PySpark 将数据从未分区的外部配置单元表work_db.customer_tbl 提取到分区的外部配置单元表final_db.customer_tbl,之前是通过配置单元查询完成的。最终表由列load_date分区(load_date 列的格式为yyyy-MM-dd)。

所以我们有一个简单的 PySpark 脚本,它使用插入查询(与之前使用的 hive 查询相同),使用spark.sql()命令获取数据。但是我们有一些严重的性能问题,因为我们在摄取后尝试摄取的表有大约3000 个分区,每个分区都有大约 4 MB 的数据,除了最后一个分区大约 4GB。总表大小接近 15GB。此外,摄取后每个分区有217 个文件。决赛桌是一张活泼的压缩镶木地板桌。

源工作表有一个 15 GB 文件,文件名格式为customers_tbl_unload.dat

早些时候,当我们通过直线连接使用 hive 查询时,通常需要大约 25-30 分钟才能完成。现在,当我们尝试使用 PySpark 脚本时,大约需要 3 个小时才能完成。

我们如何调整火花性能以使摄取时间少于直线所需的时间。

The configurations of the yarn queue we use is:
Used Resources: <memory:5117184, vCores:627>
Demand Resources:   <memory:5120000, vCores:1000>
AM Used Resources:  <memory:163072, vCores:45>
AM Max Resources:   <memory:2560000, vCores:500>
Num Active Applications:    45
Num Pending Applications:   45
Min Resources:  <memory:0, vCores:0>
Max Resources:  <memory:5120000, vCores:1000>
Reserved Resources: <memory:0, vCores:0>
Max Running Applications:   200
Steady Fair Share:  <memory:5120000, vCores:474>
Instantaneous Fair Share:   <memory:5120000, vCores:1000>
Preemptable:    true

The parameters passed to the PySpark script is:
num-executors=50
executor-cores=5
executor-memory=10GB

PySpark code used:
insert_stmt = """INSERT INTO final_db.customers_tbl PARTITION(load_date) 
SELECT col_1,col_2,...,load_date FROM work_db.customer_tbl"""
spark.sql(insert_stmt)

即使在几乎使用了纱线队列的 10% 资源之后,这项工作也花费了很多时间。我们如何调整工作以使其更有效率。

标签: apache-sparkpysparkhiveapache-spark-sql

解决方案


您需要重新分析您的数据集,并通过在日期列上对 yoir 数据集进行分区来查看您是否使用了正确的方法,或者您是否应该按年份进行分区?要了解为什么每个分区最终会有 200 多个文件,您需要了解 Spark 和 Hive 分区之间的区别。您应该首先尝试的直接方法是将输入数据集作为数据帧读取,并按您计划在 Hive 中用作分区键的键对其进行分区,然后使用 df.write.partitionBy 将其保存

由于数据似乎在日期列上也存在偏差,因此请尝试将其分区到可能具有相同数据分布的其他列上。否则,过滤掉倾斜的数据并单独处理


推荐阅读