首页 > 解决方案 > 计数操作导致更多 rack_local pyspark

问题描述

我试图了解 Spark 集群上的位置级别及其与 RDD 分区数的关系以及对其执行的操作。具体来说,我有一个分区数为 9647 的数据框。然后,我df.count对其执行并在 Spark UI 中观察到以下内容:

在此处输入图像描述

一些上下文,我使用以下配置将我的工作提交给了 Yarn 集群:

- executor_memory='10g',
- driver_memory='10g',
- num_executors='5',
- executor_cores=5'

另外,我注意到所有的执行者都来自 5 个不同的节点(主机)。

从图中我发现,在所有 9644 个任务中,超过 95% 的任务不是在同一个节点内运行。所以,我只是想知道有很多 rack_local 的原因。具体来说,为什么节点不选择最近的数据源来执行,换句话说,有更多的本地节点?

谢谢

标签: apache-sparkcluster-computinghadoop-yarn

解决方案


这里有几点需要考虑。

您可以在下面找到一些影响 Spark 中数据局部性的因素:

  1. 最初,Spark 会尝试将任务尽可能靠近源数据所在的节点。例如,如果源系统是 HDFS,Spark 将尝试在特定分区的数据所在的同一节点上执行任务。Spark 将通过实现getPreferredLocations. 稍后,TaskScheduler 将利用此信息来决定任务的位置。在RDD的定义中,您可以找到 getPreferredLocations负责指定最优的定义RDD 的位置。例如,如果源是 HDFS,Spark 将创建一个 HadoopRDD(或 NewHadoopRDD)实例,并且它将访问 Hadoop API 以检索有关覆盖getPreferredLocations其基类中的函数的源文件位置的信息。
  2. 无法实现高局部性的主要原因,例如:PROCESS_LOCAL 或 NODE_LOCAL 是目标节点中资源不足。Spark 使用该设置spark.locality.wait来设置应做出有关局部性级别的决定的等待时间。Spark 将使用此设置等待特定时间以使资源可用。如果在间隔到期后节点上没有可用的资源(核心) ,则 Spark 将降级位置级别,例如:新的降级级别也会发生同样的情况,直到满足所需的资源规范。另一方面是一种升级方式spark.locality.waitPROCESS_LOCAL -> NODE_LOCAL任务位置是添加更多资源,例如:添加新的执行程序。此处找到的测试(第 915 行)演示了这种情况。默认值为 3 秒,如果您认为应该为任务分配更多时间,您可能会决定增加此值,尽管不建议这样做(可能会无效地增加 Spark 空闲时间)。
  3. 如果您的数据位于 Spark 集群之外,则位置级别将设置为 ANY。

我对改善局部性的最后建议是让 Spark 通过使用repartition() + persist() or cache().

注意:持久化将在第一次调用动作后生效。

有用的链接:

https://www.waitingforcode.com/apache-spark/spark-data-locality/read

http://www.russellspitzer.com/2017/09/01/Spark-Locality/

https://github.com/apache/spark/blob/0bb716bac38488bc216fbda29ce54e93751e641b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala


推荐阅读