首页 > 解决方案 > 如何在 pyspark 中设置动态 spark.sql.shuffle.partitions?

问题描述

我想spark.sql.shuffle.partitions动态设置 Spark (V 2.3) 配置,并且此配置用于多个 spark 应用程序。

代码:

Spark configuration
===================
spark = SparkConf() \
 .setAppName("My App") \ 
 .set('spark.executor.memory', '7g') \
 .set('spark.num.executors', '15') \
 .set('spark.executor.cores', '4') \
 .set('spark.yarn.executor.memoryOverhead', '3098m') \
 .set('spark.sql.shuffle.partitions', '1500') \
 .set('fs.s3.multipart.uploads.enabled', 'true')

 empsql = 'Select * From Employee'  #Only 30 records and 40 columns
 df = spark.sql(empsql) ##Spark is configured
 df.coalesce(2).write.mode('overwrite').format("parquet").option("delimiter",'|').save(s3_path, header = True) #coalesce cannot be changed to repartition due to restrictions 
**Error:** Spark Out of memeory issues
**Resolved:** By changing the above spark configuration to .set('spark.sql.shuffle.partitions', '2')

对于上述数据帧,通过更改为 .set('spark.sql.shuffle.partitions', '2') 来解决,问题是,它不适用于具有超过一百万条记录的 spark 应用程序,并且它需要 .set('spark.sql.shuffle.partitions', '1500')。

如何解决此问题以及如何使其动态化?

标签: apache-sparkpysparkapache-spark-sql

解决方案


评论的初步答案 -

实际上设置'spark.sql.shuffle.partitions','num_partitions'是改变随机分区默认设置的动态方式。这里的任务是选择最好的 num_partitions。选择最佳 numPartitions 的方法可以是 -

  1. 基于集群资源
  2. 基于您要应用此属性的数据大小

更新 1

来自 作者的查询-通过添加 .set('spark.sql.shuffle.partitions',num_partitions) 是否会根据数据大小动态计算分区数,例如最多占用 1500 个分区?

@K.Tom,这只会将属性值设置(覆盖默认值 = 200)spark.sql.shuffle.partitionsnum_partitions. 每当发生任何事情时都会使用相同的shuffling actionExchange在火花计划中)。请注意,如果您的集群没有足够的资源,此操作可能会失败。此外,如果您的集群有资源来容纳num_partitions,但数据集不是很大,那么您可能会使大部分分区为空,这又是维护(调度、处理和维护所有这些分区的元数据)的开销。由此得出结论,设置属性spark.sql.shuffle.partitions是艺术与科学的混合体。


推荐阅读