python - 在 Spark 中,RDD 是不可变的,那么 Accumulators 是如何实现的呢?
问题描述
弹性分布式数据集 (RDD) 被设计为不可变的。使它们不可变的原因之一在于容错和避免,因为它们由许多进程和可能同时由许多节点处理。这可以避免竞争条件,也可以避免试图控制这些条件所涉及的开销。
关于如何实现 RDD 有几个说明(例如,这个)。但是,我似乎找不到一个说明累加器是如何实现的。它位于Apache Spark 文档中的 RDD 部分。这是否意味着值的每个增量都会创建一个新的 RDD,还是完全不同的数据结构?
解决方案
Accumulator
是执行器的只写变量。它们可以由执行器添加,并且只能由驱动程序读取。
executors and read by the driver only.
executor1: accumulator.add(incByExecutor1)
executor2: accumulator.add(incByExecutor2)
driver: println(accumulator.value)
Accumulators
不是线程安全的。他们实际上不必这样做,因为驱动程序用于在任务完成(成功或失败)后更新累加器值的DAGScheduler.updateAccumulators方法仅在运行调度循环的单个线程上执行。除此之外,对于具有自己的本地累加器引用的工作人员来说,它们是只写数据结构,而只有驱动程序才允许访问累加器的值。
累加器是可序列化的,因此可以在执行器中执行的代码中安全地引用它们,然后通过网络安全地发送执行。
val counter = sc.longAccumulator("counter")
sc.parallelize(1 to 9).foreach(x => counter.add(x))
推荐阅读
- r - 主成分回归中的“成分”是什么意思?
- autotools - 从 configure.ac 修改 $PATH
- mongodb - 如何为 mongodb 创建一个新字段,就像 SQL 中的新属性一样
- c# - 为什么 DataAvailable 在 5 秒后停止被调用?
- javascript - 如何在javascript中使用点?
- python - 在日期时间创建模式
- r - 如何在自定义环境中获取所有数据帧的 nrow
- java - 创建 Stripe Charge 时出现 NoSuchMethodError
- python - 当您不是管理员时如何在 slurm-cluster 中执行 sudo 命令
- http - 使用 Cloudflare 时的 HTTP 状态代码 304