首页 > 解决方案 > Spark 不符合预期类型 TraversableOnce

问题描述

val num_idf_pairs = rescaledData.select("item", "features")
    .rdd.map(x => {(x(0), x(1))})

val itemRdd = rescaledData.select("item", "features").where("item = 1")
    .rdd.map(x => {(x(0), x(1))})

val b_num_idf_pairs = sparkSession.sparkContext.broadcast(num_idf_pairs.collect())


val sims = num_idf_pairs.flatMap {
  case (key, value) =>
    val sv1 = value.asInstanceOf[SV]
    import breeze.linalg._
    val valuesVector = new SparseVector[Double](sv1.indices, sv1.values, sv1.size)
    itemRdd.map {
      case (id2, idf2) =>
        val sv2 = idf2.asInstanceOf[SV]
        val xVector = new SparseVector[Double](sv2.indices, sv2.values, sv2.size)
        val sim = valuesVector.dot(xVector) / (norm(valuesVector) * norm(xVector))
        (id2.toString, key.toString, sim)
    }
}

错误是不符合预期的类型TraversableOnce

当我修改如下:

val b_num_idf_pairs = sparkSession.sparkContext.broadcast(num_idf_pairs.collect())
val docSims = num_idf_pairs.flatMap {
  case (id1, idf1) =>
    val idfs = b_num_idf_pairs.value.filter(_._1 != id1)
    val sv1 = idf1.asInstanceOf[SV]
    import breeze.linalg._
    val bsv1 = new SparseVector[Double](sv1.indices, sv1.values, sv1.size)
    idfs.map {
      case (id2, idf2) =>
        val sv2 = idf2.asInstanceOf[SV]
        val bsv2 = new SparseVector[Double](sv2.indices, sv2.values, sv2.size)
        val cosSim = bsv1.dot(bsv2).asInstanceOf[Double] / (norm(bsv1) * norm(bsv2))
        (id1.toString(), id2.toString(), cosSim)
    }
}

它可以编译,但这会导致OutOfMemoryException. 我设置--executor-memory 4G

标签: scalaapache-spark

解决方案


第一个片段:

num_idf_pairs.flatMap { 
   ...
   itemRdd.map { ...}
} 

is 不仅不是有效的 Spark 代码(不允许嵌套转换),而且如您所知,不会进行类型检查,因为RDDis not TraversableOnce.

第二个片段可能会失败,因为您尝试收集和广播的数据太大。

看起来您正在尝试查找所有项目的相似性,因此您需要笛卡尔积,并大致如下构造您的代码:

num_idf_pairs
  .cartesian(itemRdd)
  .filter { case ((id1, idf1), (id2, idf2)) => id1 != id2 }
  .map    { case ((id1, idf1), (id2, idf2)) => {
    val cosSim = ??? // Compute similarity
    (id1.toString(), id2.toString(), cosSim)
   }}

推荐阅读