apache-spark - spark shuffle write 超级慢
问题描述
为什么 1.6MB 随机写入和 2.4MB 输入的 spark shuffle 阶段如此缓慢?还有为什么随机写入只发生在一个执行程序上?我正在运行一个 3 节点集群,每个集群有 8 个内核。
火花用户界面:
*JavaPairRDD<String, String> javaPairRDD = c.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String arg0) throws Exception {
// TODO Auto-generated method stub
try {
if (org.apache.commons.lang.StringUtils.isEmpty(arg0)) {
return new Tuple2<String, String>("", "");
}
Tuple2<String, String> t = new Tuple2<String, String>(getESIndexName(arg0), arg0);
return t;
} catch (Exception e) {
e.printStackTrace();
System.out.println("******* exception in getESIndexName");
}
return new Tuple2<String, String>("", "");
}
});
java.util.Map<String, Iterable<String>> map1 = javaPairRDD.groupByKey().collectAsMap();*
解决方案
为什么随机写入只发生在一个执行器上:
请检查您的 RDD 分区,以下 UI 图像帮助您找到
我认为您的 RDD 只有一个分区,而不是 8 个或更多,最终会利用所有执行程序。
rdd = rdd.repartition(8)
洗牌是跨分区重新分配数据的过程(也称为重新分区),它可能会也可能不会导致跨 JVM 进程甚至通过网络(在不同机器上的执行程序之间)移动数据。
默认情况下,洗牌不会改变分区的数量,因为你只有一个分区,它看起来很慢。
如何避免洗牌:
当两个 RDD 都有重复的 key 时,join 会导致数据的大小急剧膨胀。执行 distinct 或 combineByKey 操作以减少键空间或使用 cogroup 处理重复键而不是产生完整的叉积可能会更好。通过在合并步骤中使用智能分区,可以防止连接中的第二次混洗(我们将在后面详细讨论)。
如果两个 RDD 中都不存在密钥,则可能会意外丢失数据。使用外连接会更安全,这样可以保证将所有数据保留在左 RDD 或右 RDD 中,然后在连接后过滤数据。
如果一个 RDD 有一些易于定义的键子集,那么在另一个 RDD 中,您最好在连接之前进行过滤或减少,以避免数据的大混乱,无论如何您最终都会丢弃这些数据。
为了连接数据,Spark 需要将要连接的数据(即基于每个键的数据)存在于同一个分区上。Spark 中连接的默认实现是随机散列连接。shuffled hash join 确保每个分区上的数据将包含相同的键,方法是使用与第一个相同的默认分区器对第二个数据集进行分区,以便来自两个数据集的具有相同哈希值的键位于同一分区中。虽然这种方法总是有效的,但它可能比必要的成本更高,因为它需要洗牌。在以下情况下可以避免洗牌:
1.两个 RDD 都有一个已知的分区器。
- 其中一个数据集小到足以放入内存,在这种情况下,我们可以进行广播哈希连接(稍后我们将解释这是什么)。
请注意,如果 RDD 位于同一位置,则可以避免网络传输以及 shuffle。重新分区后始终坚持
DataFrame Joins 在 DataFrame 之间连接数据是最常见的多 DataFrame 转换之一。标准的 SQL 连接类型都支持,可以在执行连接时指定为 df.join(otherDf, sqlCondition, joinType) 中的 joinType。与 RDD 之间的连接一样,使用非唯一键连接将产生叉积(因此,如果左表有 R1 和 R2 和 key1,右表有 R3 和 R5 和 key1,你将得到 (R1, R3), (R1, R5), (R2, R3), (R2, R5)) 在输出中。
使用自连接和 lit(true),您可以生成数据集的笛卡尔积,这可能很有用,但也说明了连接(尤其是自连接)如何很容易导致无法使用的数据大小。
使用广播连接和广播连接,您可以非常有效地将大表(事实)与相对较小的表(维度)连接起来,避免通过网络发送大表的所有数据。您可以使用广播功能将数据集标记为在连接运算符中使用时广播。它使用 spark.sql.autoBroadcastJoinThreshold 设置来控制在执行连接时将广播到所有工作节点的表的大小。
使用相同的分区程序。如果两个 RDD 具有相同的 partitioner,则 join 不会导致 shuffle。但是请注意,缺少 shuffle 并不意味着不必在节点之间移动数据。两个 RDD 可能具有相同的分区器(共同分区),但相应的分区位于不同的节点上(不位于同一位置)。这种情况仍然比洗牌要好,但要记住这一点。托管可以提高性能,但很难保证。
如果数据量很大和/或您的集群无法增长甚至导致 OOM,请使用两遍方法。首先,重新分区数据并使用分区表(dataframe.write.partitionBy())进行持久化。然后,在一个循环中连续连接子分区,“附加”到同一个最终结果表。
https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications
- https://medium.com/@foundev/you-won-t-believe-how-spark-shuffling-will-probably-bite-you-also-windowing-e39d07bf754e
- https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ - https://jaceklaskowski.gitbooks.io/mastering-apache-spark /spark-rdd-shuffle.html
推荐阅读
- java - 关于为什么 ArrayList 在这里不起作用但 Hashset 起作用的问题
- python-3.x - 如何使用 copy_from 将带有日期时间的 csv 加载到 postgresql 中
- python - 在pandas和Python中iloc函数有什么优势
- c# - while 循环或 thread.Sleep 是否会阻止可能引发的事件?
- python - 删除列表的内括号
- node.js - 如何使用 node.js 从 .cer 文件中提取公钥?
- netbeans - 在 Netbean 中启用工具栏上的对齐按钮
- php - Laraver 问题在刀片视图中显示存储文件的 URL
- vue.js - 如何在 fullcalendar 5 中从 Vuex 获取数据到 dayCellDidMount?
- flutter - 有什么方法可以将颤振中的徽章对齐到最右边?