scala - Spark 不符合预期类型 TraversableOnce
问题描述
val num_idf_pairs = rescaledData.select("item", "features")
.rdd.map(x => {(x(0), x(1))})
val itemRdd = rescaledData.select("item", "features").where("item = 1")
.rdd.map(x => {(x(0), x(1))})
val b_num_idf_pairs = sparkSession.sparkContext.broadcast(num_idf_pairs.collect())
val sims = num_idf_pairs.flatMap {
case (key, value) =>
val sv1 = value.asInstanceOf[SV]
import breeze.linalg._
val valuesVector = new SparseVector[Double](sv1.indices, sv1.values, sv1.size)
itemRdd.map {
case (id2, idf2) =>
val sv2 = idf2.asInstanceOf[SV]
val xVector = new SparseVector[Double](sv2.indices, sv2.values, sv2.size)
val sim = valuesVector.dot(xVector) / (norm(valuesVector) * norm(xVector))
(id2.toString, key.toString, sim)
}
}
错误是不符合预期的类型TraversableOnce
。
当我修改如下:
val b_num_idf_pairs = sparkSession.sparkContext.broadcast(num_idf_pairs.collect())
val docSims = num_idf_pairs.flatMap {
case (id1, idf1) =>
val idfs = b_num_idf_pairs.value.filter(_._1 != id1)
val sv1 = idf1.asInstanceOf[SV]
import breeze.linalg._
val bsv1 = new SparseVector[Double](sv1.indices, sv1.values, sv1.size)
idfs.map {
case (id2, idf2) =>
val sv2 = idf2.asInstanceOf[SV]
val bsv2 = new SparseVector[Double](sv2.indices, sv2.values, sv2.size)
val cosSim = bsv1.dot(bsv2).asInstanceOf[Double] / (norm(bsv1) * norm(bsv2))
(id1.toString(), id2.toString(), cosSim)
}
}
它可以编译,但这会导致OutOfMemoryException
. 我设置--executor-memory 4G
。
解决方案
第一个片段:
num_idf_pairs.flatMap {
...
itemRdd.map { ...}
}
is 不仅不是有效的 Spark 代码(不允许嵌套转换),而且如您所知,不会进行类型检查,因为RDD
is not TraversableOnce
.
第二个片段可能会失败,因为您尝试收集和广播的数据太大。
看起来您正在尝试查找所有项目的相似性,因此您需要笛卡尔积,并大致如下构造您的代码:
num_idf_pairs
.cartesian(itemRdd)
.filter { case ((id1, idf1), (id2, idf2)) => id1 != id2 }
.map { case ((id1, idf1), (id2, idf2)) => {
val cosSim = ??? // Compute similarity
(id1.toString(), id2.toString(), cosSim)
}}
推荐阅读
- java - 使用 Class 作为 HashMap 的键会导致不良影响吗?
- android - 如何在地图上显示带有地址的开始 - 目的地屏幕
- python - 仅获取标签名称,而不使用 python beautifulsoup 获取文本
- android - 在android中等效的foreach循环是什么样的
- mysql - 可以触发访问远程 mysql 数据库。假设远程服务器没有火花并且可以触发访问数据库视图
- c# - System.Threading.Tasks.Task`1.GetResultCore
- javascript - 如何正确地将 d3js 集成到 Angular 中?
- python - DatetimeField 和 strftime 的问题
- c# - Autodesk Inventor 附加模块被阻止
- javascript - 从挂载函数内部调用 Vue 方法