首页 > 解决方案 > 在 apache spark 生产场景中处理 Skew 数据

问题描述

谁能解释一下 Apache spark 在生产中如何处理偏斜数据?

设想:

我们使用“spark-submit”提交了 spark 作业,在 spark-ui 中观察到很少有任务需要很长时间,这表明存在倾斜。

问题:

(1) 我们应该采取哪些步骤(重新分区、合并等)?

(2)我们是否需要终止作业,然后将倾斜解决方案包含在 jar 中并重新提交作业?

(3)我们可以通过直接从shell运行(coalesce)等命令来解决这个问题而不杀死工作吗?

标签: scalaapache-spark

解决方案


在应用非归约键(改组)操作时,数据倾斜主要是一个问题。最常见的两个例子是:

  • 非还原groupByKey( RDD.groupByKey, Dataset.groupBy(Key).mapGroups, Dataset.groupBy.agg(collect_list))。
  • RDDDataset joins

很少,问题与分区键和分区函数的属性有关,数据分布不存在永久问题。

// All keys are unique - no obvious data skew
val rdd = sc.parallelize(Seq(0, 3, 6, 9, 12)).map((_, None))

// Drastic data skew
rdd.partitionBy(new org.apache.spark.HashPartitioner(3)).glom.map(_.size).collect
// Array[Int] = Array(5, 0, 0)

我们应该采取哪些步骤(重新分区、合并等)?

重新分区(从不coalesce)可以帮助您解决后一种情况

  • 更改分区程序。
  • 调整分区数量以最小化数据的可能影响(在这里您可以使用与关联数组相同的规则 - 应首选素数和 2 的幂,尽管可能无法完全解决问题,如上面使用的示例中的 3)。

前一种情况通常不会从重新分区中受益很多,因为偏斜是由操作本身自然引起的。具有相同键的值不能分布在多个分区中,并且过程的非归约特性受初始数据分布的影响最小。

这些情况必须通过调整应用程序的逻辑来处理。这在实践中可能意味着很多事情,具体取决于数据或问题:

  • 完全删除操作。
  • 用近似值代替精确结果。
  • 使用不同的解决方法(通常使用连接),例如频繁-不频繁拆分迭代广播连接或使用概率过滤器(如布隆过滤器)进行预过滤。

我们是否需要终止作业,然后在 jar 中包含偏斜解决方案并重新提交作业?

通常,您至少必须使用调整参数重新提交作业。

在某些情况下(主要是RDD批处理作业),您可以设计您的应用程序,以监控任务执行并在可能出现偏差的情况下终止并重新提交特定作业,但在实践中可能很难正确实施。

一般来说,如果数据倾斜是可能的,您应该将您的应用程序设计为不受数据倾斜的影响。

我们可以通过直接从 shell 运行 (coalesce) 之类的命令来解决这个问题而不杀死工作吗?

我相信以上几点已经回答了这个问题,但只是说 - Spark 中没有这样的选项。您当然可以将这些包含在您的应用程序中。


推荐阅读