首页 > 解决方案 > 读取镶木地板文件时无法解析带有 int 和 double 的 Merge Schema

问题描述

我有两个镶木地板文件,一个包含一个整数字段myField,另一个包含一个双字段myField。尝试同时读取两个文件时

val basePath = "/path/to/file/"
val fileWithInt = basePath + "intFile.snappy.parquet"
val fileWithDouble = basePath + "doubleFile.snappy.parquet"
val result = spark.sqlContext.read.option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")

我收到以下错误

Caused by: org.apache.spark.SparkException: Failed to merge fields 'myField' and 'myField'. Failed to merge incompatible data types IntegerType and DoubleType

传递显式模式时

val schema = StructType(Seq(new StructField("myField", IntegerType)))
val result = spark.sqlContext.read.schema(schema).option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")

它失败了以下

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)

当施放双倍时

val schema = StructType(Seq(new StructField("myField", DoubleType)))

我明白了

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
    at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:60)

除了重新处理源数据之外,有谁知道解决这个问题的任何方法。

标签: scalaapache-sparkapache-spark-sql

解决方案


根据您要阅读的文件数量,您可以使用以下两种方法之一:

这对于较少数量的镶木地板文件来说是最好的

def merge(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType))
    }.reduce(_.union(_))
  }

这种方法将更好地处理大量文件,因为它将保持沿袭较短

def merge2(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    spark.sparkContext.union(paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType)).as[Double].rdd
    }.toList).toDF
  }

推荐阅读