首页 > 解决方案 > 在 Spark 中,RDD 是不可变的,那么 Accumulators 是如何实现的呢?

问题描述

弹性分布式数据集 (RDD) 被设计为不可变的。使它们不可变的原因之一在于容错和避免,因为它们由许多进程和可能同时由许多节点处理。这可以避免竞争条件,也可以避免试图控制这些条件所涉及的开销。

关于如何实现 RDD 有几个说明(例如,这个)。但是,我似乎找不到一个说明累加器是如何实现的。它位于Apache Spark 文档中的 RDD 部分。这是否意味着值的每个增量都会创建一个新的 RDD,还是完全不同的数据结构?

标签: pythonscalaapache-sparkrdd

解决方案


Accumulator是执行器的只写变量。它们可以由执行器添加,并且只能由驱动程序读取。

executors and read by the driver only.
executor1: accumulator.add(incByExecutor1)
executor2: accumulator.add(incByExecutor2)

driver:  println(accumulator.value)

Accumulators不是线程安全的。他们实际上不必这样做,因为驱动程序用于在任务完成(成功或失败)后更新累加器值的DAGScheduler.updateAccumulators方法仅在运行调度循环的单个线程上执行。除此之外,对于具有自己的本地累加器引用的工作人员来说,它们是只写数据结构,而只有驱动程序才允许访问累加器的值。
累加器是可序列化的,因此可以在执行器中执行的代码中安全地引用它们,然后通过网络安全地发送执行。

val counter = sc.longAccumulator("counter")
sc.parallelize(1 to 9).foreach(x => counter.add(x))

推荐阅读