首页 > 技术文章 > SparkCore 对共享变量也提供了两种支持:1. 累加器 2. 广播变量

xingmeng63 2022-02-02 10:59 原文

正常情况下, 传递给 Spark 算子(比如: map, reduce 等)的函数都是在远程的集群节点上执行, 函数中用到的所有变量都是独立的拷贝.

这些变量被拷贝到集群上的每个节点上, 都这些变量的更改不会传递回驱动程序.

支持跨 task 之间共享变量通常是低效的, 但是 Spark 对共享变量也提供了两种支持:

  1. 累加器
  2. 广播变量

1.1 累加器(Accumulator)

累加器用来对信息进行聚合,通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,所以更新这些副本的值不会影响驱动器中的对应变量。

如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

累加器是一种变量, 仅仅支持“add”, 支持并发. 累加器用于去实现计数器或者求和. Spark 内部已经支持数字类型的累加器, 开发者可以添加其他类型的支持.

 

object Test {

  def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setMaster("local[4]").setAppName("MapPartitionsWithIndex")
    val sc = new SparkContext(conf)

    val list = List(10, 20, 5, 8, 36, 40, 36)
    val rdd1 = sc.parallelize(list, 2)

    val acc = sc.longAccumulator

    var a = 0

    val rdd2 = rdd1.map(x => {
      a = a+1
      acc.add(1)
      x

    })
    rdd2.collect
    println(a)
    println(acc.value)
    
    sc.stop()
    
  }
}

 

自定义累加器

通过继承类AccumulatorV2来自定义累加器.

下面这个累加器可以用于在程序运行过程中收集一些文本类信息,最终以List[String]的形式返回。

 

// 1、对什么值进行累加 2、累加器最终的值
class MyIntAcc extends AccumulatorV2[Int,Int]{}

 

 

 

 

1.2 广播变量

 

广播变量在每个节点上保存一个只读的变量的缓存, 而不用给每个 task 来传送一个 copy.

 

例如, 给每个节点一个比较大的输入数据集是一个比较高效的方法. Spark 也会用该对象的广播逻辑去分发广播变量来降低通讯的成本.

 

广播变量通过调用SparkContext.broadcast(v)来创建. 广播变量是对v的包装, 通过调用广播变量的 value方法可以访问.

 

 

说明:

  1. 通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象。任何可序列化的类型都可以这么实现。
  2. 通过value属性访问该对象的值(在Java中为value()方法)。
  3. 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

 

推荐阅读