首页 > 解决方案 > 在使用 Scala 的 Spark 中从 S3 加载 csv 时,如何指定模式?

问题描述

我在堆栈上搜索了多个语法迭代,但没有一个对我有用。我的代码如下:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType};

val schema1 = (new StructType)
    .add("PASSENGERID", IntegerType, true)
    .add("PCLASS", IntegerType, true)
    .add("NAME", IntegerType, true)
    .add("SEX", StringType, true)
    .add("AGE", DoubleType, true)
    .add("SIBSP", IntegerType, true)
    .add("PARCH", IntegerType, true)
    .add("TICKET", StringType, true)
    .add("FARE", DoubleType, true)
    .add("CABIN", StringType, true)
    .add("EMBARKED", StringType, true)

 val schema2 = StructType(
    StructField("PASSENGERID", IntegerType, true) ::
    StructField("PCLASS", IntegerType, true) ::
    StructField("NAME", IntegerType, true) ::
    StructField("SEX", StringType, true) ::
    StructField("AGE", DoubleType, true) ::
    StructField("SIBSP", IntegerType, true) ::
    StructField("PARCH", IntegerType, true) ::
    StructField("TICKET", StringType, true) ::
    StructField("FARE", DoubleType, true) ::
    StructField("CABIN", StringType, true) ::
    StructField("EMBARKED", StringType, true) :: Nil)

val schema3 = StructType(Array(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)))

val schema4 = StructType(Seq(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)
))

val schema5 = StructType(
  List(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)
  )
)

/*
val df = spark.read
    .option("header", true)
    .csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")
    .schema(schema)
*/

//this works
val df = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")

df.show(false)
df.printSchema()

//fun errors
val df1 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema1)
val df2 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema2)
val df3 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema3)
val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
val df5 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema5)

数据是 kaggle titanic 生存集,标题中的字段大写。我已经尝试将此作为脚本提交给 spark-shell 以及手动在 spark-shell 中运行命令。spark-shell -i 在 dfX 读取时会吐出一些语法错误,如果我手动加载它们看起来不错的任何模式,并且读取都具有相同的错误。

scala> val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
<console>:26: error: overloaded method value apply with alternatives:
  (fieldIndex: Int)org.apache.spark.sql.types.StructField <and>
  (names: Set[String])org.apache.spark.sql.types.StructType <and>
  (name: String)org.apache.spark.sql.types.StructField
 cannot be applied to (org.apache.spark.sql.types.StructType)
       val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)

我不明白我做错了什么。我在 AWS EMR 上使用 Spark 版本 2.4.4。

标签: scalacsvapache-spark

解决方案


设置inferSchema参数false,以便 spark 在加载数据时不会推断架构。

移动你.schema之前.csv的 DataFrame 对象将没有schema功能。

请检查以下代码。

scala> val df1 = spark.read.option("header", true).option("inferSchema", false).schema(schema1).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df1: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df2 = spark.read.option("header", true).option("inferSchema", false).schema(schema2).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df2: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df3 = spark.read.option("header", true).option("inferSchema", false).schema(schema3).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df3: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df4 = spark.read.option("header", true).option("inferSchema", false).schema(schema4).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df4: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df5 = spark.read.option("header", true).option("inferSchema", false).schema(schema5).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df5: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]


推荐阅读