首页 > 解决方案 > Apache Spark:爆炸导致随机溢出

问题描述

我有 40GB 的 csv 文件。读完之后,我需要进行一系列的转换。其中之一是爆炸一列。在转换之后,我得到了下图所示的 shuffle 溢出。我明白为什么会这样。爆炸基于广播变量查找,它给出了一组非常倾斜的结果。

我的问题是 - 我怎样才能减轻泄漏?我尝试explode通过调整spark.sql.shuffle.partitions配置参数在函数之前重新分区,以确保随机分区的大小相同,但这没有帮助。

任何有关该主题的建议或文献将不胜感激!

洒

标签: apache-spark

解决方案


如果您正在使用 RDD,您可以使用 Spark 的内置分区器来自定义跨分区的数据分布。您可以在 HashPartitioner 和 RangePartitioner 之间进行选择。两者都适用于离散值和连续值。

示例 HashPartitioner:

import org.apache.spark.HashPartitioner

val rdd = df.rdd // convert DataFrame to low-level RDD
val keyedRDD = rdd.keyBy(...) // define your custom key
keyedRDD.partitionBy(new HashPartitioner(n))

示例分区器:

import org.apache.spark.Partitioner

class DomainParitioner extends Partitioner {
  def numPartitions = n
  def getPartition(key: Any): Int = {
    // your custome partition logic
  }
}

keyedRDD.partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSeq.length)

推荐阅读