首页 > 解决方案 > Spark 性能:本地比集群快(非常不均匀的执行器负载)

问题描述

让我首先说我对火花比较陌生,所以如果我说的一些没有意义的话,请纠正我。

总结这个问题,不管我做什么,在某些阶段,一个执行器完成所有计算,这使得集群执行比本地的单处理器执行慢。

全文:我编写了一个 spark 1.6 应用程序,它由一系列映射、过滤器、连接和一个简短的 graphx 部分组成。该应用程序仅使用一个数据源 - csv 文件。出于开发目的,我创建了一个模型数据集,包含 100 000 行,7MB,所有字段都具有均匀分布的随机数据(文件中也是随机排序)。连接是 PairRDD 上各个字段的自内连接(数据集有重复的键,每个键有约 200 个重复项,模仿真实数据),导致键内的笛卡尔积。然后我对连接的结果执行一些映射和过滤操作,将其存储为一些自定义类对象的 RDD,并将所有内容保存为 and 处的图形。

我在笔记本电脑上开发代码并运行它,大约需要 5 分钟(Windows 机器,本地文件)。令我惊讶的是,当我将 jar 部署到集群(主纱线、集群模式、HDFS 中的 csv 文件)并提交时,代码执行了 8 分钟。我用较小的数据进行了相同的实验,结果在本地是 40 秒,在集群上是 1.1 分钟。

当我查看历史服务器时,我发现 2 个阶段特别长(每个阶段几乎 4 分钟),并且在这些阶段中,有一个任务需要 > 90% 的时间。我多次运行代码,它总是花费这么多时间的同一个任务,即使它每次都部署在不同的数据节点上。

令我惊讶的是,当我打开执行程序时,我看到一个执行程序几乎完成了所有工作(就花费的时间而言)并执行了大多数工作。在提供的屏幕截图中,第二个最“活跃”的执行者有 50 个任务,但情况并非总是如此——在不同的提交中,第二个最繁忙的执行者有 15 个任务,而领先的一个有 95 个)。 执行者

此外,我看到计算使用了 3.9 分钟的时间(第二张截图),这在 map 后不久的连接数据上最为繁重。我认为,数据可能不会被平均划分,一个执行者必须执行所有计算。因此,我尝试在加入之前(类似的执行时间)或加入之后(执行更慢)手动(使用 .partitionBy(new HashPartitioner(40)))对 pairRdd 进行分配。 在此处输入图像描述

可能是什么问题?任何帮助将不胜感激。

标签: scalaapache-spark

解决方案


如果不查看您的查询并了解您的数据集,很难说清楚,我猜您也没有包含它,因为它非常复杂或敏感?所以这有点像在黑暗中拍摄,但这看起来很像我们在工作中处理的问题。我对正在发生的事情的粗略猜测是,在您的一次联接中,您有一个具有高基数但分布非常不均匀的键空间。在我们的案例中,我们加入了网络流量来源,虽然我们有数千种可能的流量来源,但绝大多数流量都来自少数几个。这在我们加入时引起了问题。密钥将在执行器之间平均分配,但是由于可能 95% 的数据共享可能是 3 或 4 个密钥,因此只有极少数的执行器完成了大部分工作。当您发现一个存在此问题的连接时,要做的是选择两个数据集中较小的一个,并显式执行广播连接。(Spark 通常会尝试这样做,但它并不总是能够完美地判断何时应该这样做。)

为此,假设您有两个 DataFrame。其中一个有两列,number其中stringRepnumber 只是所有整数的一行,0-10000并且stringRep只是它的字符串表示,所以“一”、“二”、“三”等。我们称之为numToString

另一个 DataFrame 有一些要加入number的关键列numToString被调用kind,一些其他不相关的数据和 100,000,000 行。我们称之为 DataFrame ourData。然后假设 100,000,000 行的分布ourData是 90% have kind == 1, 5% have kind == 2,剩下的 5% 在剩余的 99,998 个数字中分布得相当均匀。当您执行以下代码时:

val numToString: DataFrame = loadNumToString()
val ourData: DataFrame = loadOurCode()

val joined = ourData.join(numToString).where(ourData("kind") === numToString("number"))

...Spark 很可能会将 %90 的数据(具有 的数据kind == 1)发送到一个执行程序,将 %5 的数据(具有 的数据kind == 2)发送到另一个执行程序,剩余的 %5 会涂抹在其余部分,留下两个 executor 有巨大的分区,其余的分区非常小。

正如我之前提到的,解决这个问题的方法是显式执行广播连接。这样做是获取一个 DataFrame 并将其完全分发到每个节点。所以你会这样做:

val joined = ourData.join(broadcast(numToString)).where(ourData("kind") === numToString("number"))

...这将发送numToString给每个执行者。假设ourData事先均匀分区,数据应该在执行器之间保持均匀分区。这可能不是您的问题,但听起来确实很像我们遇到的问题。希望能帮助到你!

有关广播连接的更多信息,请参见: https ://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html


推荐阅读