首页 > 解决方案 > 如何获取/构建 JavaRDD[DataSet]?

问题描述

当我使用 deeplearning4j 并尝试在 Spark 中训练模型时

public MultiLayerNetwork fit(JavaRDD<DataSet> trainingData)

fit() 需要一个 JavaRDD 参数,我尝试像这样构建

    val totalDaset = csv.map(row => {
      val features = Array(
        row.getAs[String](0).toDouble, row.getAs[String](1).toDouble
      )
      val labels = Array(row.getAs[String](21).toDouble)
      val featuresINDA = Nd4j.create(features)
      val labelsINDA = Nd4j.create(labels)
      new DataSet(featuresINDA, labelsINDA)
    })

但是 IDEA 的提示是No implicit arguments of type:Encode[DataSet]
这是一个错误,我不知道如何解决这个问题,
我知道 SparkRDD 可以转换为 JavaRDD,但我不知道如何构建 Spark RDD[DataSet]
DataSet 在import org.nd4j.linalg.dataset.DataSet
它的构造方法是

    public DataSet(INDArray first, INDArray second) {
        this(first, second, (INDArray)null, (INDArray)null);
    }

这是我的代码

val spark:SparkSession = {SparkSession
      .builder()
      .master("local")
      .appName("Spark LSTM Emotion Analysis")
      .getOrCreate()
    }
    import spark.implicits._
    val JavaSC = JavaSparkContext.fromSparkContext(spark.sparkContext)

    val csv=spark.read.format("csv")
      .option("header","true")
      .option("sep",",")
      .load("/home/hadoop/sparkjobs/LReg/data.csv")

    val totalDataset = csv.map(row => {
      val features = Array(
        row.getAs[String](0).toDouble, row.getAs[String](1).toDouble
      )
      val labels = Array(row.getAs[String](21).toDouble)
      val featuresINDA = Nd4j.create(features)
      val labelsINDA = Nd4j.create(labels)
      new DataSet(featuresINDA, labelsINDA)
    })

    val data = totalDataset.toJavaRDD

在 deeplearning4j 官方指南中通过 Java 创建 JavaRDD[DataSet]:

String filePath = "hdfs:///your/path/some_csv_file.csv";
JavaSparkContext sc = new JavaSparkContext();
JavaRDD<String> rddString = sc.textFile(filePath);
RecordReader recordReader = new CSVRecordReader(',');
JavaRDD<List<Writable>> rddWritables = rddString.map(new StringToWritablesFunction(recordReader));

int labelIndex = 5;         //Labels: a single integer representing the class index in column number 5
int numLabelClasses = 10;   //10 classes for the label
JavaRDD<DataSet> rddDataSetClassification = rddWritables.map(new DataVecDataSetFunction(labelIndex, numLabelClasses, false));

我尝试通过 scala 创建:

    val JavaSC: JavaSparkContext = new JavaSparkContext()
    val rddString: JavaRDD[String] = JavaSC.textFile("/home/hadoop/sparkjobs/LReg/hf-data.csv")
    val recordReader: CSVRecordReader = new CSVRecordReader(',')
    val rddWritables: JavaRDD[List[Writable]] = rddString.map(new StringToWritablesFunction(recordReader))
    val featureColnum = 3
    val labelColnum = 1
    val d = new DataVecDataSetFunction(featureColnum,labelColnum,true,null,null)
//    val rddDataSet: JavaRDD[DataSet] = rddWritables.map(new DataVecDataSetFunction(featureColnum,labelColnum, true,null,null))
// can not reslove overloaded method 'map'

调试错误信息:

在此处输入图像描述

标签: rddapache-spark-datasetnd4j

解决方案


DataSet 只是一对 INDArray。(输入和标签)我们的文档深入介绍了这一点: https ://deeplearning4j.konduit.ai/distributed-deep-learning/data-howto

为了堆栈溢出,我将总结这里的内容,因为没有“1”的方式来创建数据管道。这与你的问题有关。这与您在本地创建数据集的方式非常相似,通常您希望在本地执行任何操作并将其放入函数中。

例如,CSV 和图像将会非常不同。但通常你使用 datavec 库来做到这一点。文档总结了每种方法的方法。


推荐阅读