scala - Spark:无法将此数组转换为不安全的格式,因为它太大了
问题描述
我有一个像这样的矩阵:
0.0 0.4 0.4 0.0
0.1 0.0 0.0 0.7
0.0 0.2 0.0 0.3
0.3 0.0 0.0 0.0
我想把它写到hdfs,而不是模仿spark的LogisticRegression中的源代码,那么我的代码就很糟糕:
private case class Data(unigram: Array[Double],
interceptVector: Matrix)
val data = Data(unigram.value, denseVector)
val df = sparkSession.createDataFrame(Seq(data))
df.repartition(1).write.mode("overwrite").parquet(bigramPath)
如果矩阵很小,一切正常,但是当矩阵很大时,spark throw blow 错误:
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot convert this array to unsafe format as it's too big.
at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:447)
at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:487)
at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:66)
at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:28)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toCatalystImpl(CatalystTypeConverters.scala:143)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$$anonfun$fromProduct$1.apply(LocalRelation.scala:42)
at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$$anonfun$fromProduct$1.apply(LocalRelation.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$.fromProduct(LocalRelation.scala:42)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:315)
at com.wps.NgramModel2.save(Ngram2.scala:119)
at com.wps.NgramDemo2$.main(NgramDemo2.scala:24)
at com.wps.NgramDemo2.main(NgramDemo2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我能怎么做?把矩阵一一拆分写?还是有其他好办法?
我发现这种方法可以很好地工作。
val bigramArray = JavaArrayOps.dmDToArray2(bigram)
val lines: Array[String] = bigramArray
.map(line => line.mkString(" "))
var outputStream: FSDataOutputStream = null
var bufferedWriter: BufferedWriter = null
val hadoopConf: Configuration = new Configuration
outputStream = HDFSUtil.getFSDataOutputStream(bigramPath, "/part-00000", hadoopConf)
bufferedWriter = new BufferedWriter((new OutputStreamWriter(outputStream)))
for (a <- 0 until lines.length) {
bufferedWriter.write(lines(a) + "\n")
}
解决方案
IIUC,您想要save
矩阵并load
作为Estimator
.
下面是保存虚拟矩阵并加载它的代码 -
case class Data(matrix: Matrix)
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.linalg.{Matrices, Matrix}
def save(matrix: Matrix, path: String): Unit = {
val data = Data(matrix)
val df = spark.createDataFrame(Seq(data))
val dataPath = new Path(path, "data").toString
df.repartition(1).write.mode("overwrite").parquet(dataPath)
}
def load(path: String): Matrix = {
val dataPath = new Path(path, "data").toString
val df = spark.read.parquet(dataPath)
val Row(matrix: Matrix) = df.select("matrix").head()
matrix
}
使用3 * 3
相同的矩阵对其进行测试-
println("### input matrix ###")
val matrixToSave = Matrices.eye(3)
println(matrixToSave)
save(matrixToSave, "/path/models/matrix")
val matrixLoaded = load("/path/models/matrix")
println("### Loaded matrix ###")
println(matrixLoaded)
输出-
### input matrix ###
1.0 0.0 0.0
0.0 1.0 0.0
0.0 0.0 1.0
### Loaded matrix ###
1.0 0.0 0.0
0.0 1.0 0.0
0.0 0.0 1.0
希望能帮助到你!
推荐阅读
- javascript - 在传单中,如何在标记鼠标悬停时同时显示工具提示和图像?
- amazon-web-services - 无法部署应用程序:在中止部署期间,某些实例可能已经部署了新的应用程序版本
- c# - MVC RedirectToAction 不起作用 - 获取“找不到资源”
- api - 如何使用 Dio 在 Flutter 中调用 API?
- firebase - Cloud Functions 部署错误构建步骤 3
- java - 如何将倒计时代码添加到 Java 应用程序?
- amazon-web-services - 如何使用 !Sub 或 !Join in "AWS::CloudFormation::Init",文件密钥
- c# - 无法从程序集 Newtonsoft.Json 加载类型“Newtonsoft.Json.JsonConvert”
- android - 在android studio中的textview中移动实际文本
- javascript - 上传图片后,Vue Firebase 无法获取图片 url