首页 > 解决方案 > 如何在 GraphX 的聚合消息中使用组合器

问题描述

在 GraphX 聚合消息 API

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

但是我想修改合并阶段的返回类型,这意味着我想要类似的东西,combineByKey而不是reduceByKey,我该如何基于 GraphX 的优势来做到这一点?或者换句话说,我怎么能只使用这个函数的结果sendMsg并跳过mergeMsg这个函数的阶段?

我的意思是 GraphX 的优点是“以顶点为中心”,如果我使用它mapcombineByKey运行它会进行全局洗牌,这会花费大量时间,与“以顶点为中心”的想法背道而驰

标签: apache-sparkspark-graphx

解决方案


collectEdgesin GraphOpsAPI 可能会有所帮助。

它收集每个顶点的相邻边并可以返回一个VertexRDD[Array[Edge[ED]]]类型,这意味着它会同时更改返回类型并收集消息,请参阅API 文档


推荐阅读