apache-spark - 对 Spark/Graphx/Pregel 示例程序的停止条件感到困惑,以查找“路径距离”
问题描述
'
我正在通过 Graphx In Action 工作,这本书(源代码在这里:https ://github.com/insidectm/spark-graphx-in-action )讨论了两种计算距离的方法(边缘跳数) 在树的根和所有节点到叶子之间。我了解使用 aggregateMessages 提供的代码示例。特别是,停止条件是有意义的(我通过包含文本“停止条件”的注释突出显示了该条件,如下。)一旦图形顶点上的属性停止变化,继续运行算法。
当我查看 Pregel 计算相同结果的方式时,我有点困惑(如下所示。)
特别是在调用 Pregel 的 apply 方法时,maxIterations 是默认值,即 Integer.MAX_VALUE(出于所有实际目的,“永远运行”。)因此,“sendMsg”函数似乎是:
(et:EdgeTriplet[Int,String]) =>
Iterator((et.dstId, et.srcAttr+1)),
将被无限调用,即使在顶点上的值已经收敛之后。
是否有一些我忽略的机制导致程序在收敛后停止?
// aggregateMessages approach
// from: https://github.com/insidedctm/spark-graphx-in-action/blob/51e4c667b927466bd02a0a027ca36625b010e0d6/Chapter04/Listing4_10IteratedFurthestVertex.scala
def sendMsg(ec: EdgeContext[Int,String,Int]): Unit = {
ec.sendToDst(ec.srcAttr+1)
}
def mergeMsg(a: Int, b: Int): Int = {
math.max(a,b)
}
def propagateEdgeCount(g:Graph[Int,String])
:Graph[Int,String] = {
val verts =
g.aggregateMessages[Int](sendMsg, mergeMsg)
val g2 =
Graph(verts, g.edges)
val check =
g2.vertices.join(g.vertices).
map(x => x._2._1 – x._2._2).
reduce(_ + _)
// STOP CONDITION
// check here ensures stop if nothing changed (******)
if (check > 0)
propagateEdgeCount(g2)
else
g
}
// Pregel approach
val g = Pregel(myGraph.mapVertices((vid,vd) => 0), 0,
activeDirection = EdgeDirection.Out)(
(id:VertexId,vd:Int,a:Int) => math.max(vd,a),
(et:EdgeTriplet[Int,String]) =>
Iterator((et.dstId, et.srcAttr+1)),
(a:Int,b:Int) => math.max(a,b))
g.vertices.collect
解决方案
据我所知,如果所有节点都停止,那么 pregel 将自行停止工作。
停止所有节点的方法有两种,可以通过所有节点的属性不再改变来实现:
1.给出发送消息的条件,即如果给定条件为假,节点将停止发送消息。
2.给出一个函数,所有节点在经过多次迭代后都会停止,也就是说,虽然发送消息的条件仍然为真,但所有节点的属性都没有改变。
val bfs2 = initialGraph2.pregel(Double.PositiveInfinity)( (id, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr == Double.PositiveInfinity) {Iterator((triplet.dstId, triplet.srcAttr+1))} else {Iterator.empty}}, (a,b) => math.min(a,b) ).cache()
"triplet.dstAttr == Double.PositiveInfinity"
是继续条件。
如果所有节点都小于 Double.PositiveInfinity,则发送消息动作将停止,显然,所有节点都会停止。
推荐阅读
- javascript - 从 JSON 字典中获取键值
- ruby - 在消息中打印“可用方法”电报 API Ruby
- c++ - 如何修复包含必须返回值的错误?
- javascript - React Router 与 Private Route 渲染空白页面
- http - https是如何识别的?
- c# - C#轻量级骨架解析文件
- javascript - 如何打破输入的javascript操作并开始新的操作
- push-notification - 在聊天应用程序中使用新消息更新用户
- java - android- Unable to send selected media to uri // java.io.File 不能应用于 android.net.Uri
- selection - Bokeh 1.0.4 - 使用交互式图例跨多个字形选择不包括所有字形