首页 > 解决方案 > 在 Spark Scala 中注册前一个 DF 后创建一个 DF

问题描述

我是 Spark Scala 的新开发人员,我想问你我的问题。

我有两个巨大的数据框,我的第二个数据框是从第一个数据框计算出来的(它包含与第一个数据框不同的列)。

为了优化我的代码,我想到了这种方法:

所以,它写了这个:

//val temp1 is my first DF
writeAsTextFileAndMerge("result1.csv", "/user/result", temp1, spark.sparkContext.hadoopConfiguration)

val temp2 = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
      .csv("/user/result/result1.csv").select("ID").distinct

    writeAsTextFileAndMerge("result2.csv", "/user/result",
      temp2, spark.sparkContext.hadoopConfiguration)

这是我的保存功能:

def writeAsTextFileAndMerge(fileName: String, outputPath: String, df: DataFrame, conf: Configuration) {
    val sourceFile = WorkingDirectory
    df.write.options(Map("header" -> "true", "delimiter" -> ";")).mode("overwrite").csv(sourceFile)
    merge(fileName, sourceFile, outputPath, conf)
  }

  def merge(fileName: String, srcPath: String, dstPath: String, conf: Configuration) {
    val hdfs = FileSystem.get(conf)
    val destinationPath = new Path(dstPath)
    if (!hdfs.exists(destinationPath))
      hdfs.mkdirs(destinationPath)
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName),
      true, conf, null)
  }

对我来说这似乎是“合乎逻辑的”,但我这样做时出错了。我想 Spark 不可能“等待”直到在 HDFS 中注册我的第一个 DF 并在读取这个新文件之后(或者我的保存功能可能有一些错误?)。

这是我得到的例外:

19/02/16 17:27:56 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 1
java.lang.ArrayIndexOutOfBoundsException: 1

你能帮我解决这个问题吗?

标签: scalaapache-sparkdataframe

解决方案


问题在于合并 - Spark 不知道,因此与您正在进行的所有 HDFS 操作不同步。

好消息是您不需要这样做。只需执行 df.write,然后使用读取创建一个新的数据帧(spark 会将所有部分读入单个 df)

即以下将工作得很好

temp1.write.options(Map("header" -> "true", "delimiter" -> ";")).mode("overwrite").csv("/user/result/result1.csv")
val temp2 = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
      .csv("/user/result/result1.csv").select("ID").distinct

推荐阅读