首页 > 技术文章 > Spark调优之--资源调优、并行度调优

guoyu1 2020-01-08 15:05 原文

一、给足资源:

1、搭建集群的时候:给足SPARK_WORKER_CORES、SPARK_WORKER_MEMORY。

2、提交任务的时候:生产环境中提交任务使用的是shell脚本,要根据实际的情况设置好里面的参数。

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置 executor 的数量
--driver-memory 100m \ 配置 driver 的内存(影响不大)
--executor-memory 100m \ 配置每个 executor 的内存大小
--executor-cores 3 \ 配置每个 executor 的 cpu core 数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

调节依据:

Yarn:资源队列。资源调度。应该去查看,你的 spark 作业,要提交到的资源队列,大概有多少资源?

500G 内存,100 个 cpu core;executor,50;10G 内存,2 个 cpu core,平均每个 executor。

 

 调节资源以后,性能为什么会提升:

(1)excutor的数量增多,意味着spark集群可以并行处理的task的数量增多了,一个excutor的cpu core处理一个task。

例如:原来集群中只有10个excutor,每个excutor有2个cpu core,那么集群一批能够并行跑20个task。

若我们将excutor的数量提升到50个,每个excutor有2个cpu core,那么集群一批就能并行的跑100个task。

(2)excutor的cpu core:同上。

(3)excutor的内存:主要为以下三方面的作用。

a、若程序中需要对RDD进行缓存,那么就可以缓存更多的数据到内存,将更少的数据写入磁盘,甚至不写磁盘,减少磁盘的IO。

b、在shuffle阶段,reduce端会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。增大内存,也可以减少shuffle阶段的磁盘io,提升shuffle的性能。

c、在task执行过程中,可能会频繁的创建对象,若excutor内存小了,就会导致JVM堆经常满掉。因此会频繁的GC,导致程序运行过慢。内存加大后,GC减少,速度加快。

 

二、提高并行度:

并行度:Spark 作业中, 各个 stage 的 的 task 数量,也就代表了 Spark 作业的在各个阶段(stage)的并行度。

  因此要合理设置并行度,就可以完全充分利用集群的计算资源,并且减少每个 task 要处理的数据量,最终,就是提升整个 Spark 作业的性能和运行速度。

1、sparkConf中设置task的数量:要合理设置task数量,设置多少合适。

  a、理想情况的数量:task 的数量至少要设置成与 Spark application 的总 cpu core 数量相同(最理想情况,比如总共 150 个 cpu core,分配了 150 个 task,一起运行,差不多同一时间运行完毕)

  b、官方推荐的数量:task 数量,设置成 spark application 总 cpu core 数量的 2~3 倍,比如 150 个cpu core,基本要设置 task 数量为 300~500;

  实际情况,与理想情况不同的,有些 task 会运行的快一点,比如 50s 就完了,有些 task,可能会慢一点,要 1 分半才运行完,所以如果你的 task 数量,刚好设置的跟 cpu core 数量相同,可能还是会导致资源的浪费,因为,比如 150 个 task,10 个先运行完了,剩余 140 个还在运行,但是这个时候,有 10 个 cpu core 就空闲出来了,就导致了浪费。那如果 task 数量设置成 cpu core 总数的 2~3 倍,那么一个 task 运行完了以后,另一个 task 马上可以补上来,就尽量让 cpu core 不要空闲,同时也是尽量提升 spark 作业运行的效率和速度,提升性能。

  SparkConf conf = new SparkConf().set("spark.default.parallelism", "500") // 即设置 task 的数量,全局都有效的

  如果conf设置了 spark.default.parallelism 这个属性,那么在程序中的算子,例如groupByKey操作(这里的groupByKey指shuffle操作的算子) 不指定参数时会默认到的读取设置的默认并行度参数。

2、程序中设定并行度

sc.textFile(xx,minNumpartition)

sc.parallelize(xx,num)

sc.parallelizePairs(xx,num)

sc.makeRDD(xx,num)

reduceByKey(xx,num) ,distinct(num),join(xx,num) .... 

repartition/coalesce(num,shffle=true)

自定义分区器提高RDD的并行度

spark.sql.shuffle.partitions  = 200

SparkStreaming中 :

Receiver:spark.streaming.blockInterval = 200ms

Direct:与读取的topic的partition个数 一致

 

spark shell中设置并行度:

--conf spark.default.parallelism=500

spark.default.parallelism For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
  • Local mode: number of cores on the local machine
  • Mesos fine grained mode: 8
  • Others: total number of cores on all executor nodes or 2, whichever is larger
Default number of partitions in RDDs returned by transformations like joinreduceByKey, and parallelize when not set by user. 0.5.0

 

3、repartition 解决 Spark SQL 低并行度的性能问题

1、spark.default.parallelism

2、textFile(),传入第二个参数,指定 partition 数量(比较少用)

  在生产环境中,是最好自己设置一下的。官网有推荐的设置方式,你的 spark-submit 脚本中, 会指定你的 application 总共要启动多少个 executor,100 个;每个 executor 多少个 cpu core, 2~3 个;总共 application,有 cpu core,200 个。

  官方推荐,根据你的 application 的总 cpu core 数量(在 spark-submit 中可以指定,200 个), 自己手动设置 spark.default.parallelism 参数,指定为 cpu core 总数的 2~3 倍。400~600 个并 行度。600。

  设置的这个并行度,在哪些情况下会生效?哪些情况下,不会生效?

  如果没有使用 Spark SQL(DataFrame),那么你整个 spark application 默认所有 stage 的并 行度都是你设置的那个参数。(除非你使用 coalesce 算子缩减过 partition 数量)

  问题来了:如果使用了 Spark SQL。用 Spark SQL 的那个 stage 的并行度,你没法自己指定。 Spark SQL 自己会默认根据 hive 表对应的 hdfs 文件的 block,自动设置 Spark SQL 查询所 在的那个 stage 的并行度。你自己通过 spark.default.parallelism 参数指定的并行度,只会在 没有 Spark SQL 的 stage 中生效。

  比如你第一个 stage,用了 Spark SQL 从 hive 表中查询出了一些数据,然后做了一些 transformation 操作,接着做了一个 shuffle 操作(groupByKey);下一个 stage,在 shuffle 操 作之后,做了一些 transformation 操作。hive 表,对应了一个 hdfs 文件,有 20 个 block;你 自己设置了 spark.default.parallelism 参数为 100。

  你的第一个 stage 的并行度,是 不受你的控制的,就只有 20 个 task;第二个 stage 的并行度, 才是你自己设置的 100。

  产生的问题?

  Spark SQL 默认情况下,它的那个并行度,咱们没法设置。可能导致的问题,也许没什么问 题,也许很有问题。Spark SQL 所在的那个 stage 中,后面的那些 transformation 操作,可 能会有非常复杂的业务逻辑,甚至说复杂的算法。如果你的 Spark SQL 默认把 task 数量设 置的很少,20 个,然后每个 task 要处理很大的数据量,然后还要执行特别复杂的算法。 这个时候,就会导致第一个 stage 的速度,特别慢。第二个 stage,1000 个 task,刷刷刷, 非常快。

解决方法:

  repartition 算子,Spark SQL 这一步的并行度和 task 数量,肯定是没有办法去改变了。但 是呢,可以用于 Spark SQL 查询出来的 RDD,使用 repartition 算子,去重新进行分区,此 时可以分区成多个 partition,比如从 20 个 partition,分区成 100 个。
然后呢,从 repartition 以后的 RDD,再往后,并行度和 task 数量,就会按照你预期的来了。 就可以避免跟 Spark SQL 绑定在一个 stage 中的算子,只能使用少量的 task 去处理大量数据 以及复杂的算法逻辑。

 

 

转载博客:https://www.cnblogs.com/frankdeng/p/9301780.html

 

推荐阅读