首页 > 解决方案 > 如何将一些数据关联到spark中的每个分区并重新使用它?

问题描述

我有一个分区的 rdd,我想从每个分区中提取一些数据,以便以后可以重新使用它。过度简化可能是:

val rdd = sc.parallelize(Seq("1-a", "2-b", "3-c"), 3)
val mappedRdd = rdd.mapPartitions{ dataIter => 
    val bufferedIter = dataIter.buffered  

    //extract data which we want to re-use inside each partition    
    val reusableData = bufferedIter.head.charAt(0) 

    //use that data and return (but this does not allow me to re-use it) 
    bufferedIter.map(_ +  reusableData)
}

我的解决方案是在 rdd 中提取可重用数据:

val reusableDataRdd = rdd.mapPartitions { dataIter => 
    //return an iterator with only one item on each partition
    Iterator(dataIter.buffered.head.charAt(0))
}

然后压缩分区

rdd.zipPartitions(reusableDataRdd){(dataIter, reusableDataIter) => 
    val reusableData = reusableDataIter.next
    dataIter.map(_ + reusableData)
}

我会得到相同的结果,mappedRdd但我也会得到我的可重用数据 rdd。

是否有更好的选择来提取和重用数据?也许更优雅或更优化?

标签: scalaapache-sparkrddapache-spark-2.0

解决方案


推荐阅读