scala - 广播有关 Spark 作业的更新
问题描述
我在这里看到过这个问题,但他们基本上专注于火花流,我找不到适合批量工作的解决方案。这个想法是循环几天,并在每次迭代/天更新有关前一天的信息(用于当前迭代)。代码如下所示:
var prevIterDataRdd = // some RDD
days.foreach(folder => {
val previousData : Map[String, Double] = parseResult(prevIterDataRdd)
val broadcastMap = sc.broadcast(previousData)
val (result, previousStatus) =
processFolder(folder, broadcastMap)
// store result
result.write.csv(outputPath)
// updating the RDD that enables me to extract previousData to update broadcast
val passingPrevStatus = prevIterDataRdd.subtractByKey(previousStatus)
prevIterDataRdd = previousStatus.union(passingPrevStatus)
broadcastMap.unpersist(true)
broadcastMap.destroy()
})
UsingbroadcastMap.destroy()
不运行,因为它不允许我再次使用 broadcastMap (我实际上不明白,因为它应该是完全不相关的 - 不可变的)。
我应该如何运行这个循环并在每次迭代时更新广播变量?
使用方法时,unpersist
我传递true
参数以使其阻塞。sc.broadcast()
也堵?
unpersist()
如果我再次广播,我真的需要吗?
destroy
既然我正在创建一个新的广播变量,为什么我在使用后不能再次使用广播?
解决方案
广播变量是不可变的,但您可以创建一个新的广播变量。这个新的广播变量可以在下一次迭代中使用。
您需要做的就是更改对新创建的广播的引用,从执行程序中取消保留旧广播并从驱动程序中销毁它。
在类级别定义变量,这将允许您更改驱动程序中广播变量的引用并使用销毁方法。
object Main extends App {
// defined and initialized at class level to allow reference change
var previousData: Map[String, Double] = null
override def main(args: Array[String]): Unit = {
//your code
}
}
不允许对变量使用 destroy 方法,因为驱动程序中不再存在引用。更改对新广播变量的引用可以解决该问题。
Unpersist 仅从执行程序中删除数据,因此当重新访问变量时,驱动程序会将其重新发送给执行程序。
blocking = true
将允许您让应用程序在下次访问之前从执行程序中完全删除数据。
sc.broadcast()
-没有官方文件说它是阻塞的。尽管一旦调用它,应用程序就会在运行下一行代码之前开始将数据广播给执行程序。因此,如果数据非常大,它可能会减慢您的应用程序。所以要小心你是如何使用它的。
在销毁之前调用 unpersist是一个很好的做法。这将帮助您完全摆脱执行程序和驱动程序中的数据。
推荐阅读
- php - 通过 MySQL 进行并发分配
- angular - 为什么我的 http 请求取消/停止 observables 流?
- python - Python:时间/帧转换计算器
- javascript - 如何匹配具有两个不同单词的所有行?
- documentation - 我如何将我的项目中的代码呈现到我的论文中?
- css - 类型“字符串”不可分配给类型“位置”
- javascript - 在我的页面和 iframe 之间进行通信并相互发送数据
- xcode - XCode - 如何创建捆绑的 CLI 工具应用程序
- flutter - Dart:无法添加 _InternalLinkedHashMap
到列表 - c# - 如何从另一个表单刷新文本框?C#