首页 > 解决方案 > 如何将 RDD[(String, Iterable[VertexId])] 转换为 DataFrame?

问题描述

我创建了一个RDD看起来Graphx像这样的:

val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
var s: VertexRDD[VertexId] = graph.connectedComponents().vertices

val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
  val rand = randomUUID().toString
  val clusterList: Iterable[VertexId] = y.map(_._1)
  (rand, clusterList)
}

nodeGraph是 type RDD[(String, Iterable[VertexId])],里面的数据是这样的:

(abc-def11, Iterable(1,2,3,4)), 
(def-aaa, Iterable(10,11)), 
...

我现在要做的是从中创建一个数据框,它应该如下所示:

col1        col2
abc-def11   1
abc-def11   2
abc-def11   3
abc-def11   4
def-aaa     10
def-aaa     11

如何在 Spark 中做到这一点?

标签: scalaapache-sparkdataframeapache-spark-sqlspark-graphx

解决方案


toDF()首先,使用您想要的列名将RDD 转换为数据框。这最容易通过更改Iterable[VertexId]Seq[Long]first 来完成。

import spark.implicits._
val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")

请注意,这可以在创建nodeGraph以保存步骤时完成。接下来,使用explode函数来展平数据框,

val df2 = df.withColumn("col2", explode($"col2"))

这将为您提供所需的输出。


推荐阅读