apache-spark - 从 `String` 对生成 `VertexId`
问题描述
我正在使用 GraphX 在 Spark 上处理一些图形数据。输入数据为RDD[(String, String)]
。我使用以下代码片段来映射String
和VertexId
构建图表。
val input: RDD[(String, String)] = ...
val vertexIds = input.map(_._1)
.union(input.map(_._2))
.distinct()
.zipWithUniqueId()
.cache()
val edges = input.join(vertexIds)
.map { case (u, (v, uid)) => (v, uid) }
.join(vertexIds)
.map { case (v, (uid, vid)) => Edge(uid, vid, 1) }
val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )
当我抽查前 1000 个最高度数的节点时,我发现 GraphX 的结果与原始输入不同。这是我转储高度节点的方法
graph.outerJoinVertices(graph.outDegrees) {
(_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)
我怀疑.zipWithUniqueId
每次评估都会给出不稳定的 ID。我试过了
- insert
vertexIds.count()
强制实现,这样vertexIds
就不会被重新评估。 - 插入
.sortBy(...).zipWithUniqueId()
以确保顺序相同。
他们都没有解决问题。每次运行的前 1000 度节点的结果略有不同。
解决方案
我找到了两种稳定String -> VertexId
映射的解决方案:
坚持
vertexIds
到FS。input.map(_._1) .union(input.map(_._2)) .distinct() .zipWithUniqueId() .saveAsObjectFile("some location") val vertexId = sc.objectFile("some location")
使用抗碰撞哈希函数。我使用Guava的 murmur3_128 哈希并将前 8 个字节作为 vertexId。使用这种方法,您无需进行任何进一步的连接,这样效率更高。
推荐阅读
- c# - 如何在 C# 中的 DOC 文件上添加水印?不是 DOCX 文件
- javascript - 我试图从颜色选择器中获取颜色代码
- android - TextView 和 ProgressBar 值在当前活动的回击后未更新然后返回该活动
- java - 使用 Rest Assured GET 方法获取错误代码 503
- angular8 - 模拟函数调用时如何检查发射值
- java - 如何在 RestTemplate 上应用 PowerMockito
- reactjs - 如何在 React 功能组件中添加事件
- google-sheets - Google表格:创建条形图以显示每月每天工作的所有人的每日总数
- c++ - 从 RC 文件访问字符串?
- c - 如何搜索数组中的数字?