scala - 在 apache spark 生产场景中处理 Skew 数据
问题描述
谁能解释一下 Apache spark 在生产中如何处理偏斜数据?
设想:
我们使用“spark-submit”提交了 spark 作业,在 spark-ui 中观察到很少有任务需要很长时间,这表明存在倾斜。
问题:
(1) 我们应该采取哪些步骤(重新分区、合并等)?
(2)我们是否需要终止作业,然后将倾斜解决方案包含在 jar 中并重新提交作业?
(3)我们可以通过直接从shell运行(coalesce)等命令来解决这个问题而不杀死工作吗?
解决方案
在应用非归约键(改组)操作时,数据倾斜主要是一个问题。最常见的两个例子是:
- 非还原
groupByKey
(RDD.groupByKey
,Dataset.groupBy(Key).mapGroups
,Dataset.groupBy.agg(collect_list)
)。 RDD
和Dataset
joins
。
很少,问题与分区键和分区函数的属性有关,数据分布不存在永久问题。
// All keys are unique - no obvious data skew
val rdd = sc.parallelize(Seq(0, 3, 6, 9, 12)).map((_, None))
// Drastic data skew
rdd.partitionBy(new org.apache.spark.HashPartitioner(3)).glom.map(_.size).collect
// Array[Int] = Array(5, 0, 0)
我们应该采取哪些步骤(重新分区、合并等)?
重新分区(从不coalesce
)可以帮助您解决后一种情况
- 更改分区程序。
- 调整分区数量以最小化数据的可能影响(在这里您可以使用与关联数组相同的规则 - 应首选素数和 2 的幂,尽管可能无法完全解决问题,如上面使用的示例中的 3)。
前一种情况通常不会从重新分区中受益很多,因为偏斜是由操作本身自然引起的。具有相同键的值不能分布在多个分区中,并且过程的非归约特性受初始数据分布的影响最小。
这些情况必须通过调整应用程序的逻辑来处理。这在实践中可能意味着很多事情,具体取决于数据或问题:
我们是否需要终止作业,然后在 jar 中包含偏斜解决方案并重新提交作业?
通常,您至少必须使用调整参数重新提交作业。
在某些情况下(主要是RDD
批处理作业),您可以设计您的应用程序,以监控任务执行并在可能出现偏差的情况下终止并重新提交特定作业,但在实践中可能很难正确实施。
一般来说,如果数据倾斜是可能的,您应该将您的应用程序设计为不受数据倾斜的影响。
我们可以通过直接从 shell 运行 (coalesce) 之类的命令来解决这个问题而不杀死工作吗?
我相信以上几点已经回答了这个问题,但只是说 - Spark 中没有这样的选项。您当然可以将这些包含在您的应用程序中。
推荐阅读
- python - 打开和读取没有库的 CSV 文件
- r - R Shiny,如何根据selectinput小部件结果在shinyapp中使用highcharts向下钻取?
- mysql - Mysql - 查询优化
- heroku - 为什么我的 Knex 迁移在 Heroku 上失败,但不是在本地?
- android - 我如何将重力设置为正确的行recyclerview
- java - Java>=8 在带有 instanceof 实例的静态上下文中调用接口上的默认方法?
- excel - 如何在 VBA 中授予对 excel 文件的访问权限
- twilio - 从 Freshdesk Automation webhook 触发 Twilio Flow
- java - 从 Json 获取房间 ID
- typescript - 如何通过 TypeScript 中的封装来保护和数组不被更改?