scala - Spark 性能:本地比集群快(非常不均匀的执行器负载)
问题描述
让我首先说我对火花比较陌生,所以如果我说的一些没有意义的话,请纠正我。
总结这个问题,不管我做什么,在某些阶段,一个执行器完成所有计算,这使得集群执行比本地的单处理器执行慢。
全文:我编写了一个 spark 1.6 应用程序,它由一系列映射、过滤器、连接和一个简短的 graphx 部分组成。该应用程序仅使用一个数据源 - csv 文件。出于开发目的,我创建了一个模型数据集,包含 100 000 行,7MB,所有字段都具有均匀分布的随机数据(文件中也是随机排序)。连接是 PairRDD 上各个字段的自内连接(数据集有重复的键,每个键有约 200 个重复项,模仿真实数据),导致键内的笛卡尔积。然后我对连接的结果执行一些映射和过滤操作,将其存储为一些自定义类对象的 RDD,并将所有内容保存为 and 处的图形。
我在笔记本电脑上开发代码并运行它,大约需要 5 分钟(Windows 机器,本地文件)。令我惊讶的是,当我将 jar 部署到集群(主纱线、集群模式、HDFS 中的 csv 文件)并提交时,代码执行了 8 分钟。我用较小的数据进行了相同的实验,结果在本地是 40 秒,在集群上是 1.1 分钟。
当我查看历史服务器时,我发现 2 个阶段特别长(每个阶段几乎 4 分钟),并且在这些阶段中,有一个任务需要 > 90% 的时间。我多次运行代码,它总是花费这么多时间的同一个任务,即使它每次都部署在不同的数据节点上。
令我惊讶的是,当我打开执行程序时,我看到一个执行程序几乎完成了所有工作(就花费的时间而言)并执行了大多数工作。在提供的屏幕截图中,第二个最“活跃”的执行者有 50 个任务,但情况并非总是如此——在不同的提交中,第二个最繁忙的执行者有 15 个任务,而领先的一个有 95 个)。
此外,我看到计算使用了 3.9 分钟的时间(第二张截图),这在 map 后不久的连接数据上最为繁重。我认为,数据可能不会被平均划分,一个执行者必须执行所有计算。因此,我尝试在加入之前(类似的执行时间)或加入之后(执行更慢)手动(使用 .partitionBy(new HashPartitioner(40)))对 pairRdd 进行分配。
可能是什么问题?任何帮助将不胜感激。
解决方案
如果不查看您的查询并了解您的数据集,很难说清楚,我猜您也没有包含它,因为它非常复杂或敏感?所以这有点像在黑暗中拍摄,但这看起来很像我们在工作中处理的问题。我对正在发生的事情的粗略猜测是,在您的一次联接中,您有一个具有高基数但分布非常不均匀的键空间。在我们的案例中,我们加入了网络流量来源,虽然我们有数千种可能的流量来源,但绝大多数流量都来自少数几个。这在我们加入时引起了问题。密钥将在执行器之间平均分配,但是由于可能 95% 的数据共享可能是 3 或 4 个密钥,因此只有极少数的执行器完成了大部分工作。当您发现一个存在此问题的连接时,要做的是选择两个数据集中较小的一个,并显式执行广播连接。(Spark 通常会尝试这样做,但它并不总是能够完美地判断何时应该这样做。)
为此,假设您有两个 DataFrame。其中一个有两列,number
其中stringRep
number 只是所有整数的一行,0-10000
并且stringRep
只是它的字符串表示,所以“一”、“二”、“三”等。我们称之为numToString
另一个 DataFrame 有一些要加入number
的关键列numToString
被调用kind
,一些其他不相关的数据和 100,000,000 行。我们称之为 DataFrame ourData
。然后假设 100,000,000 行的分布ourData
是 90% have kind == 1
, 5% have kind == 2
,剩下的 5% 在剩余的 99,998 个数字中分布得相当均匀。当您执行以下代码时:
val numToString: DataFrame = loadNumToString()
val ourData: DataFrame = loadOurCode()
val joined = ourData.join(numToString).where(ourData("kind") === numToString("number"))
...Spark 很可能会将 %90 的数据(具有 的数据kind == 1
)发送到一个执行程序,将 %5 的数据(具有 的数据kind == 2
)发送到另一个执行程序,剩余的 %5 会涂抹在其余部分,留下两个 executor 有巨大的分区,其余的分区非常小。
正如我之前提到的,解决这个问题的方法是显式执行广播连接。这样做是获取一个 DataFrame 并将其完全分发到每个节点。所以你会这样做:
val joined = ourData.join(broadcast(numToString)).where(ourData("kind") === numToString("number"))
...这将发送numToString
给每个执行者。假设ourData
事先均匀分区,数据应该在执行器之间保持均匀分区。这可能不是您的问题,但听起来确实很像我们遇到的问题。希望能帮助到你!
有关广播连接的更多信息,请参见: https ://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html
推荐阅读
- python - 在 python 中尝试“import epipy”或“from epipy import *”时出错(使用 pycharm IDE)
- html - How to change ::before SVG background-image color on hover, focus, active
- batch-file - FTP upload not working with Windows 7 scheduler
- android - cant launch coroutine builder
- python - 如何在 Google Colaboratory Python Notebook 中使用 Flask?
- azure-active-directory - 仅提示第二因素通知
- api - GetResponse API v3: GET /contacts does not return all contacts
- jdbc - Taurus 无法加载 JDBC 驱动程序
- windows - 如何在 Windows 上的 cygwin 的帮助下从命令行运行 shell 脚本?
- rest - Google Classroom Rest API 在公告中修补“文本”不起作用