首页 > 解决方案 > Spark:无法将此数组转换为不安全的格式,因为它太大了

问题描述

我有一个像这样的矩阵:

0.0  0.4  0.4  0.0 
0.1  0.0  0.0  0.7 
0.0  0.2  0.0  0.3 
0.3  0.0  0.0  0.0

我想把它写到hdfs,而不是模仿spark的LogisticRegression中的源代码,那么我的代码就很糟糕:

private case class Data(unigram: Array[Double],
                          interceptVector: Matrix)
val data = Data(unigram.value, denseVector)
val df = sparkSession.createDataFrame(Seq(data))
df.repartition(1).write.mode("overwrite").parquet(bigramPath)

如果矩阵很小,一切正常,但是当矩阵很大时,spark throw blow 错误:

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot convert this array to unsafe format as it's too big.
    at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:447)
    at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:487)
    at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:66)
    at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:28)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toCatalystImpl(CatalystTypeConverters.scala:143)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
    at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$$anonfun$fromProduct$1.apply(LocalRelation.scala:42)
    at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$$anonfun$fromProduct$1.apply(LocalRelation.scala:42)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$.fromProduct(LocalRelation.scala:42)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:315)
    at com.wps.NgramModel2.save(Ngram2.scala:119)
    at com.wps.NgramDemo2$.main(NgramDemo2.scala:24)
    at com.wps.NgramDemo2.main(NgramDemo2.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我能怎么做?把矩阵一一拆分写?还是有其他好办法?

我发现这种方法可以很好地工作。

    val bigramArray = JavaArrayOps.dmDToArray2(bigram)
    val lines: Array[String] = bigramArray
          .map(line => line.mkString(" "))
    var outputStream: FSDataOutputStream = null
    var bufferedWriter: BufferedWriter = null

    val hadoopConf: Configuration = new Configuration
    outputStream = HDFSUtil.getFSDataOutputStream(bigramPath, "/part-00000", hadoopConf)
    bufferedWriter = new BufferedWriter((new OutputStreamWriter(outputStream)))

    for (a <- 0 until lines.length) {
      bufferedWriter.write(lines(a) + "\n")
    }

标签: scalaapache-spark

解决方案


IIUC,您想要save矩阵并load作为Estimator.

下面是保存虚拟矩阵并加载它的代码 -

  case class Data(matrix: Matrix)

    import org.apache.hadoop.fs.Path
    import org.apache.spark.ml.linalg.{Matrices, Matrix}

    def save(matrix: Matrix, path: String): Unit = {
      val data = Data(matrix)
      val df = spark.createDataFrame(Seq(data))
      val dataPath = new Path(path, "data").toString
      df.repartition(1).write.mode("overwrite").parquet(dataPath)
    }

    def load(path: String): Matrix = {
      val dataPath = new Path(path, "data").toString
      val df = spark.read.parquet(dataPath)
      val Row(matrix: Matrix) = df.select("matrix").head()
      matrix
    }

使用3 * 3相同的矩阵对其进行测试-

    println("### input matrix ###")
    val matrixToSave = Matrices.eye(3)
    println(matrixToSave)
    save(matrixToSave, "/path/models/matrix")
    val matrixLoaded = load("/path/models/matrix")
    println("### Loaded matrix ###")
    println(matrixLoaded)

输出-

### input matrix ###
1.0  0.0  0.0  
0.0  1.0  0.0  
0.0  0.0  1.0  
### Loaded matrix ###
1.0  0.0  0.0  
0.0  1.0  0.0  
0.0  0.0  1.0  

希望能帮助到你!


推荐阅读