首页 > 解决方案 > 如何展平序列的 RDD

问题描述

我目前有一个RDD[Seq[MatrixEntry]]我试图RDD[MatrixEntry]通过展开或展平Seq. 我可以将 转换Seq为其他数据类型,例如List,但我无法完全删除它。

我的问题与此惊人地相似,因为我可以转换Seq, 但MatrixEntry不是 Ints,但尝试类似的解决方案会产生

error: type mismatch;
 found   : org.apache.spark.mllib.linalg.distributed.MatrixEntry
 required: scala.collection.GenTraversableOnce[?]

我试图运行的代码试图将 DataFrame 转换为 CoordinateMatrix:

def matmaker(data: DataFrame):Unit={
    val rrd:RDD[(List[(Any,Int)],Long)] = data.rdd
        .map(r => r.toSeq.toList.zipWithIndex)
        .zipWithIndex()
    val precord:RDD[MatrixEntry] = rrd.map{
      case ((s:Seq[(Any,Int)],r:Long)) => s.map{
        case (value:Any,c:Int) => MatrixEntry(r,c,value.toString.toDouble)
      }.flatMap(List => List) //Problem Here
    }
    precord.foreach(println)
    //new CoordinateMatrix(precord.map(r=>r.take(r.length-1)))
  }

完成测试后,函数的最终类型将从 更改为UnitCoordinateMatrix

标签: scalaapache-sparksequencerdd

解决方案


您的平面图只是放错了位置。

而不是}.flatMap(x=>x)}你需要这样写}}.flatMap(x=>x)

def matmaker(data: DataFrame):CoordinateMatrix={
    val rrd:RDD[(List[(Any,Int)],Long)] = data.rdd
        .map(r => r.toSeq.toList.zipWithIndex)
        .zipWithIndex()
    val precord:RDD[MatrixEntry] = rrd.map{
        case ((s:Seq[(Any,Int)],r:Long)) => s.map{
            case (value:Any,c:Int) => MatrixEntry(r,c,value.toString.toDouble)
        }//No Problem Here, flatmap was moved
    }.flatMap(x => x) 
    new CoordinateMatrix(precord)
}

推荐阅读