首页 > 解决方案 > Spark 如何使用 Hash Partitioner 分配数据?

问题描述

为了了解 Spark 分区的工作原理,我在 spark 1.6 上有以下代码

// Count size of partition for RDD[(String, Int)]
def countByPartition1(rdd: RDD[(String, Int)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

// Count size of partition for RDD[String]
def countByPartition2(rdd: RDD[String]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

// Case 1
val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa", 1)), 8)
countByPartition1(rdd1).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)

// Case 2
val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
countByPartition2(rdd2).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)

在这两种情况下,数据都是均匀分布的。基于上述观察,我确实有以下问题:

  1. 在 rdd1 的情况下,哈希分区应该计算 key 的哈希码(即本例中的“aa”),所以所有记录都应该去单个分区而不是均匀分布?
  2. 在 rdd2 的情况下,没有键值对,那么哈希分区如何工作,即计算哈希码的关键是什么?

我也关注了@zero323 的回答,但找不到上述问题的答案。

标签: apache-sparkhashrddpartition

解决方案


在从读取文件或从驱动程序生成自己的此类初始分配中,没有应用像哈希这样的实际分区器。

如果您运行val p = rdd1.partitioner,您将看到该值None

  • 给定 V 或 (K,V) 格式的 RDD 的 N 个值 - 那么 V 或 (K,V) 格式实际上并不相关:

    • 那么对于 M 个分区,Spark 必须有一个算法来计算在哪里放置什么数据,否则它永远无法通过这一步,我们就无法继续工作!

      • 然后 Spark 将根据下一个整数 (M/N) 以相等的间隔放置等量的数据。

        • 因此,如果我有 10 个分区的 4 个值,则每个 (10 / 2.5) 步的更高整数 Spark 将放置数据。这就是你所看到的。这同样适用于具有 8 个分区的 4 个值 - 如您所见。

尚未应用散列。边缘情况是初始分配。对于驱动程序创建的 RDD,这是它的工作方式,对于基于文件的源有点不同。


推荐阅读