apache-spark - Spark 如何使用 Hash Partitioner 分配数据?
问题描述
为了了解 Spark 分区的工作原理,我在 spark 1.6 上有以下代码
// Count size of partition for RDD[(String, Int)]
def countByPartition1(rdd: RDD[(String, Int)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
// Count size of partition for RDD[String]
def countByPartition2(rdd: RDD[String]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
// Case 1
val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa", 1)), 8)
countByPartition1(rdd1).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
// Case 2
val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
countByPartition2(rdd2).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
在这两种情况下,数据都是均匀分布的。基于上述观察,我确实有以下问题:
- 在 rdd1 的情况下,哈希分区应该计算 key 的哈希码(即本例中的“aa”),所以所有记录都应该去单个分区而不是均匀分布?
- 在 rdd2 的情况下,没有键值对,那么哈希分区如何工作,即计算哈希码的关键是什么?
我也关注了@zero323 的回答,但找不到上述问题的答案。
解决方案
在从读取文件或从驱动程序生成自己的此类初始分配中,没有应用像哈希这样的实际分区器。
如果您运行val p = rdd1.partitioner
,您将看到该值None
。
给定 V 或 (K,V) 格式的 RDD 的 N 个值 - 那么 V 或 (K,V) 格式实际上并不相关:
那么对于 M 个分区,Spark 必须有一个算法来计算在哪里放置什么数据,否则它永远无法通过这一步,我们就无法继续工作!
然后 Spark 将根据下一个整数 (M/N) 以相等的间隔放置等量的数据。
- 因此,如果我有 10 个分区的 4 个值,则每个 (10 / 2.5) 步的更高整数 Spark 将放置数据。这就是你所看到的。这同样适用于具有 8 个分区的 4 个值 - 如您所见。
尚未应用散列。边缘情况是初始分配。对于驱动程序创建的 RDD,这是它的工作方式,对于基于文件的源有点不同。
推荐阅读
- navigation - 如何从 gps ned 计算速度矢量
- tensorflow - Keras模型中的双三次块
- php - 二进制数据可以在没有base64的情况下存储在Redis Hashes中吗?(通过phpredis)
- html - 使其具有响应性
- google-sheets - Google 表格平均了一系列形式为 MM:SS[.\d+] 的持续时间
- javascript - 2个websockets之间的冲突
- laravel - Laravel Eloquent 获取数据(有关系)的困惑
- javascript - 使用正则表达式检查管理员和用户的输入
- gradle - 依赖问题
- angular - Angular:如何自定义 Canvas 条形图标签?