首页 > 解决方案 > spark shuffle write 超级慢

问题描述

为什么 1.6MB 随机写入和 2.4MB 输入的 spark shuffle 阶段如此缓慢?还有为什么随机写入只发生在一个执行程序上?我正在运行一个 3 节点集群,每个集群有 8 个内核。

火花用户界面:

在此处输入图像描述 在此处输入图像描述 代码:

*JavaPairRDD<String, String> javaPairRDD = c.mapToPair(new PairFunction<String, String, String>() {
    @Override
    public Tuple2<String, String> call(String arg0) throws Exception {
        // TODO Auto-generated method stub

        try {
            if (org.apache.commons.lang.StringUtils.isEmpty(arg0)) {
                return new Tuple2<String, String>("", "");
            }
            Tuple2<String, String> t = new Tuple2<String, String>(getESIndexName(arg0), arg0);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("******* exception in getESIndexName");
        }
        return new Tuple2<String, String>("", "");
    }
});

java.util.Map<String, Iterable<String>> map1 = javaPairRDD.groupByKey().collectAsMap();* 

标签: apache-sparkapache-spark-sql

解决方案


为什么随机写入只发生在一个执行器上:

请检查您的 RDD 分区,以下 UI 图像帮助您找到

在此处输入图像描述

我认为您的 RDD 只有一个分区,而不是 8 个或更多,最终会利用所有执行程序。

rdd = rdd.repartition(8) 

避免 Shuffle “阶段少,跑得快

洗牌是跨分区重新分配数据的过程(也称为重新分区),它可能会也可能不会导致跨 JVM 进程甚至通过网络(在不同机器上的执行程序之间)移动数据。

默认情况下,洗牌不会改变分区的数量,因为你只有一个分区,它看起来很慢。

如何避免洗牌:

  • 当两个 RDD 都有重复的 key 时,join 会导致数据的大小急剧膨胀。执行 distinct 或 combineByKey 操作以减少键空间或使用 cogroup 处理重复键而不是产生完整的叉积可能会更好。通过在合并步骤中使用智能分区,可以防止连接中的第二次混洗(我们将在后面详细讨论)。

  • 如果两个 RDD 中都不存在密钥,则可能会意外丢失数据。使用外连接会更安全,这样可以保证将所有数据保留在左 RDD 或右 RDD 中,然后在连接后过滤数据。

  • 如果一个 RDD 有一些易于定义的键子集,那么在另一个 RDD 中,您最好在连接之前进行过滤或减少,以避免数据的大混乱,无论如何您最终都会丢弃这些数据。

  • 为了连接数据,Spark 需要将要连接的数据(即基于每个键的数据)存在于同一个分区上。Spark 中连接的默认实现是随机散列连接。shuffled hash join 确保每个分区上的数据将包含相同的键,方法是使用与第一个相同的默认分区器对第二个数据集进行分区,以便来自两个数据集的具有相同哈希值的键位于同一分区中。虽然这种方法总是有效的,但它可能比必要的成本更高,因为它需要洗牌。在以下情况下可以避免洗牌:

    1.两个 RDD 都有一个已知的分区器。

    1. 其中一个数据集小到足以放入内存,在这种情况下,我们可以进行广播哈希连接(稍后我们将解释这是什么)。

请注意,如果 RDD 位于同一位置,则可以避免网络传输以及 shuffle。重新分区后始终坚持

  • DataFrame Joins 在 DataFrame 之间连接数据是最常见的多 DataFrame 转换之一。标准的 SQL 连接类型都支持,可以在执行连接时指定为 df.join(otherDf, sqlCondition, joinType) 中的 joinType。与 RDD 之间的连接一样,使用非唯一键连接将产生叉积(因此,如果左表有 R1 和 R2 和 key1,右表有 R3 和 R5 和 key1,你将得到 (R1, R3), (R1, R5), (R2, R3), (R2, R5)) 在输出中。

  • 使用自连接和 lit(true),您可以生成数据集的笛卡尔积,这可能很有用,但也说明了连接(尤其是自连接)如何很容易导致无法使用的数据大小。

  • 使用广播连接和广播连接,您可以非常有效地将大表(事实)与相对较小的表(维度)连接起来,避免通过网络发送大表的所有数据。您可以使用广播功能将数据集标记为在连接运算符中使用时广播。它使用 spark.sql.autoBroadcastJoinThreshold 设置来控制在执行连接时将广播到所有工作节点的表的大小。

  • 使用相同的分区程序。如果两个 RDD 具有相同的 partitioner,则 join 不会导致 shuffle。但是请注意,缺少 shuffle 并不意味着不必在节点之间移动数据。两个 RDD 可能具有相同的分区器(共同分区),但相应的分区位于不同的节点上(不位于同一位置)。这种情况仍然比洗牌要好,但要记住这一点。托管可以提高性能,但很难保证。

  • 如果数据量很大和/或您的集群无法增长甚至导致 OOM,请使用两遍方法。首先,重新分区数据并使用分区表(dataframe.write.partitionBy())进行持久化。然后,在一个循环中连续连接子分区,“附加”到同一个最终结果表。

  • https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

  • https://medium.com/@foundev/you-won-t-believe-how-spark-shuffling-will-probably-bite-you-also-windowing-e39d07bf754e
  • https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ - https://jaceklaskowski.gitbooks.io/mastering-apache-spark /spark-rdd-shuffle.html

推荐阅读