scala - 如何在 Spark GraphX 中使用 Pregel 进行迭代操作?
问题描述
我正在使用 GraphX 绘制图表。我想以这样一种方式实现算法,即每个节点将其属性发送给其邻居并且每个邻居接收消息,并且当时只保存其分数最高的属性。例如,发送到节点的消息如下:(4,0.96),(8,0.1),(15,0.8),...
. 第一个数字是标签,第二个数字是该标签的分数。在这种情况下,将选择得分为 0.96 的标签 4,因为它当时得分最高。在算法结束时,每个节点都有一个列表,用于存储每次迭代中得分最高的标签。
我正在使用的图表具有以下结构:
(1,(11,0.2))
(2,(8,0.6))
(3,(5,0.3))
(4,(4,1))
...
键是 NodeID,属性由 a label
which isLong
和score
标签 which is组成Double
。
预期的最终结构类似于此结构:
(1,List((2,0.49),(8,0.9),(13,0.79)))
(2,List((11,0.89),(6,0.68),(13,0.79),(10,0.57)))
(3,List((20,0.0.8),(1,0.66)))
...
上面的结构意味着节点 1 收到了三个标签,分别是 2,8 和 13。
我正在尝试将 Pregel 用于我的算法,但我遇到了类型不匹配的一些问题。谁能帮我用 Pregel 实现代码?我会很感激的。
这是我正在尝试编写的代码,但我无法完成它!
def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VD, ED] = {
val temp_graph = graph
def sendMessage(e: EdgeTriplet[VertexId, ED]): Iterator[(VertexId,(Long,Double))] = {
Iterator((e.srcId,e.dstAttr), (e.dstId,e.srcAttr))
}
def mergeMessage() = {
}
def vertexProgram(vid: VertexId, attr: (Long, Double), message: (Long, Double)) = {
}
val initialMessage = (Long, Double)
Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
vprog = vertexProgram,
sendMsg = sendMessage,
mergeMsg = mergeMessage)
}
val new_updated_graph = run(new_graph,5)
解决方案
推荐阅读
- node.js - 节点 js 中的 Sage pay 集成
- python - 补丁颜色全部在补丁集合中重置
- reactjs - 如何使用 ReactJS 创建可重用的插件
- php - 致命错误:require_once():在 C:\Users\zu 中打开所需的 'C:\core/vendor/autoload.php' (include_path='C:\xampp\php\PEAR') 失败
- javascript - 使用订单项属性自定义订单确认电子邮件
- python - fileinput.input() 如何工作?
- c# - Json.NET 不会使用自定义 getter 和不可变类型反序列化属性
- node.js - 如何在 React Native 中从自己的 REST API 获取路由?
- amazon-ec2 - 如何在使用脚本创建新的 ec2 时避免 terraform 以前的 ec2 被破坏
- python - Python初学者无法导入模块