scala - 如何将 RDD[(String, Iterable[VertexId])] 转换为 DataFrame?
问题描述
我创建了一个RDD
看起来Graphx
像这样的:
val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
var s: VertexRDD[VertexId] = graph.connectedComponents().vertices
val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
val rand = randomUUID().toString
val clusterList: Iterable[VertexId] = y.map(_._1)
(rand, clusterList)
}
nodeGraph
是 type RDD[(String, Iterable[VertexId])]
,里面的数据是这样的:
(abc-def11, Iterable(1,2,3,4)),
(def-aaa, Iterable(10,11)),
...
我现在要做的是从中创建一个数据框,它应该如下所示:
col1 col2
abc-def11 1
abc-def11 2
abc-def11 3
abc-def11 4
def-aaa 10
def-aaa 11
如何在 Spark 中做到这一点?
解决方案
toDF()
首先,使用您想要的列名将RDD 转换为数据框。这最容易通过更改Iterable[VertexId]
为Seq[Long]
first 来完成。
import spark.implicits._
val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")
请注意,这可以在创建nodeGraph
以保存步骤时完成。接下来,使用explode
函数来展平数据框,
val df2 = df.withColumn("col2", explode($"col2"))
这将为您提供所需的输出。
推荐阅读
- apache-spark - 为什么使用 Hadoop 为什么我们有 Spark?
- elasticsearch - 是否有过滤掉 Elasticsearch 查询响应的选项,例如 _shards?
- xaml - 如何在 UWP 中为 HighContrast 主题设置 TextBlock 背景颜色
- python - 忽略某些值对 pandas DataFrame 进行排序
- botframework - 如何更改发布卡片的 Teams 机器人的通知文本?
- php - wordpress 从复选框数组中输出数字
- docker - Java Gradle 构建在 docker 中花费了太多时间
- git - 我可以在不提交的情况下切换分支吗?
- msbuild - 如何在单个构建中为 2 个平台构建项目
- php - 由于 503 错误,多个 php 脚本运行了几天