首页 > 解决方案 > 从 `String` 对生成 `VertexId`

问题描述

我正在使用 GraphX 在 Spark 上处理一些图形数据。输入数据为RDD[(String, String)]。我使用以下代码片段来映射StringVertexId构建图表。

val input: RDD[(String, String)] = ...

val vertexIds = input.map(_._1)
                     .union(input.map(_._2))
                     .distinct()
                     .zipWithUniqueId()
                     .cache()

val edges = input.join(vertexIds)
                 .map { case (u, (v, uid)) => (v, uid) }
                 .join(vertexIds)
                 .map { case (v, (uid, vid)) => Edge(uid, vid, 1) }

val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )

当我抽查前 1000 个最高度数的节点时,我发现 GraphX 的结果与原始输入不同。这是我转储高度节点的方法

graph.outerJoinVertices(graph.outDegrees) {
  (_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)

我怀疑.zipWithUniqueId每次评估都会给出不稳定的 ID。我试过了

他们都没有解决问题。每次运行的前 1000 度节点的结果略有不同。

标签: apache-sparkspark-graphx

解决方案


我找到了两种稳定String -> VertexId映射的解决方案:

  • 坚持vertexIds到FS。

    input.map(_._1)
         .union(input.map(_._2))
         .distinct()
         .zipWithUniqueId()
         .saveAsObjectFile("some location")
    val vertexId = sc.objectFile("some location")
    
  • 使用抗碰撞哈希函数。我使用Guava的 murmur3_128 哈希并将前 8 个字节作为 vertexId。使用这种方法,您无需进行任何进一步的连接,这样效率更高。


推荐阅读