scala - 我需要持久化不断更新的 RDD 吗?
问题描述
我正在使用一个火花程序,它需要在循环中不断更新一些 RDD:
var totalRandomPath: RDD[String] = null
for (iter <- 0 until config.numWalks) {
var randomPath: RDD[String] = examples.map { case (nodeId, clickNode) =>
clickNode.path.mkString("\t")
}
for (walkCount <- 0 until config.walkLength) {
randomPath = edge2attr.join(randomPath.mapPartitions { iter =>
iter.map { pathBuffer =>
val paths: Array[String] = pathBuffer.split("\t")
(paths.slice(paths.size - 2, paths.size).mkString(""), pathBuffer)
}
}).mapPartitions { iter =>
iter.map { case (edge, (attr, pathBuffer)) =>
try {
if (pathBuffer != null && pathBuffer.nonEmpty && attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty) {
val nextNodeIndex: PartitionID = GraphOps.drawAlias(attr.J, attr.q)
val nextNodeId: VertexId = attr.dstNeighbors(nextNodeIndex)
s"$pathBuffer\t$nextNodeId"
} else {
pathBuffer //add
}
} catch {
case e: Exception => throw new RuntimeException(e.getMessage)
}
}.filter(_ != null)
}
}
if (totalRandomPath != null) {
totalRandomPath = totalRandomPath.union(randomPath)
} else {
totalRandomPath = randomPath
}
}
在这个程序中,RDDtotalRandomPath
和randomPath
不断更新,有很多转换操作:join
和mapPartitions
. 该计划将以行动结束collect
。
那么我需要坚持那些不断更新的 RDD(totalRandomPath,randomPath)来加速我的 spark 程序吗?
而且我注意到这个程序在单节点机器上运行很快,但在三节点集群中运行时速度变慢,为什么会发生这种情况?
解决方案
是的,您需要保留更新的 RDD 并取消保留旧的 RDD
var totalRandomPath:RDD[String] = spark.sparkContext.parallelize(List.empty[String]).cache()
for (iter <- 0 until config.numWalks){
// existing logic
val tempRDD = totalRandomPath.union(randomPath).cache()
tempRDD foreach { _ => } //this will trigger cache operation for tempRDD immediately
totalRandomPath.unpersist() //unpersist old RDD which is no longer needed
totalRandomPath = tempRDD // point totalRandomPath to updated RDD
}
推荐阅读
- sap - Sap.Data.SQLAnywhere.SAException:使用 .NET 连接到 Sybase SQL Anywhere 时找不到语言资源文件 (dblgen17.dll)
- python - 使用 Pomegranate 拟合 Beta 分布
- gatsby - 使用 netlifyCMS 和 Gatsby “找不到后端”
- javascript - Laravel/Vue JS - 无法从 Android 设备上传文件
- r - 将数据框拆分为具有相同列数的 N 个子集
- excel - 如何匹配Excel中两个单独列中的返回相关单元格字段?
- java - Maven 无法解析正确的 SNAPSHOT 依赖项
- angular - 导出数组和导出模块有什么区别?
- java - 在具有多个参数类型约束的 Kotlin 中调用具有泛型类型的方法
- python - 如何使用 Python 跳过 n 行二进制标准输入?