从执行结果上看,结果相同,但是mapPartitions的速度比map快了一些。这是为什么呢?我们看下源码
1 /** 2 * Return a new RDD by applying a function to all elements of this RDD. 3 */ 4 def map[U: ClassTag](f: T => U): RDD[U] = withScope { 5 val cleanF = sc.clean(f) 6 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) 7 }
1 /** 2 * Return a new RDD by applying a function to each partition of this RDD. 3 * 4 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which 5 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. 6 */ 7 def mapPartitions[U: ClassTag]( 8 f: Iterator[T] => Iterator[U], 9 preservesPartitioning: Boolean = false): RDD[U] = withScope { 10 val cleanedF = sc.clean(f) 11 new MapPartitionsRDD( 12 this, 13 (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), 14 preservesPartitioning) 15 }
首先看方法总结上就可以看出,map是作用于RDD的每个元素,而mapPartitons则是作用于RDD的每个分区。
同时mapPartitons内的要求传入的方法f的作用对象是scala的Iterator,所以上例中的x是Iterator,
x调用的map是scala语言的map,与第一个RDD调用的map方法不同,x调用的map是scala语言的map时,会把每个分区的数据都读取到内存,更耗内存,也更快。
假设有 N 个元素, 有 M 个分区,对于mappartitions,那么 map 的函数的将被调用 N 次,而 mapPartitions 被调用 M 次,一个函数一次处理所有
分区。