scala - 在使用 Scala 的 Spark 中从 S3 加载 csv 时,如何指定模式?
问题描述
我在堆栈上搜索了多个语法迭代,但没有一个对我有用。我的代码如下:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType};
val schema1 = (new StructType)
.add("PASSENGERID", IntegerType, true)
.add("PCLASS", IntegerType, true)
.add("NAME", IntegerType, true)
.add("SEX", StringType, true)
.add("AGE", DoubleType, true)
.add("SIBSP", IntegerType, true)
.add("PARCH", IntegerType, true)
.add("TICKET", StringType, true)
.add("FARE", DoubleType, true)
.add("CABIN", StringType, true)
.add("EMBARKED", StringType, true)
val schema2 = StructType(
StructField("PASSENGERID", IntegerType, true) ::
StructField("PCLASS", IntegerType, true) ::
StructField("NAME", IntegerType, true) ::
StructField("SEX", StringType, true) ::
StructField("AGE", DoubleType, true) ::
StructField("SIBSP", IntegerType, true) ::
StructField("PARCH", IntegerType, true) ::
StructField("TICKET", StringType, true) ::
StructField("FARE", DoubleType, true) ::
StructField("CABIN", StringType, true) ::
StructField("EMBARKED", StringType, true) :: Nil)
val schema3 = StructType(Array(
StructField("PASSENGERID", IntegerType, true),
StructField("PCLASS", IntegerType, true),
StructField("NAME", IntegerType, true),
StructField("SEX", StringType, true),
StructField("AGE", DoubleType, true),
StructField("SIBSP", IntegerType, true),
StructField("PARCH", IntegerType, true),
StructField("TICKET", StringType, true),
StructField("FARE", DoubleType, true),
StructField("CABIN", StringType, true),
StructField("EMBARKED", StringType, true)))
val schema4 = StructType(Seq(
StructField("PASSENGERID", IntegerType, true),
StructField("PCLASS", IntegerType, true),
StructField("NAME", IntegerType, true),
StructField("SEX", StringType, true),
StructField("AGE", DoubleType, true),
StructField("SIBSP", IntegerType, true),
StructField("PARCH", IntegerType, true),
StructField("TICKET", StringType, true),
StructField("FARE", DoubleType, true),
StructField("CABIN", StringType, true),
StructField("EMBARKED", StringType, true)
))
val schema5 = StructType(
List(
StructField("PASSENGERID", IntegerType, true),
StructField("PCLASS", IntegerType, true),
StructField("NAME", IntegerType, true),
StructField("SEX", StringType, true),
StructField("AGE", DoubleType, true),
StructField("SIBSP", IntegerType, true),
StructField("PARCH", IntegerType, true),
StructField("TICKET", StringType, true),
StructField("FARE", DoubleType, true),
StructField("CABIN", StringType, true),
StructField("EMBARKED", StringType, true)
)
)
/*
val df = spark.read
.option("header", true)
.csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")
.schema(schema)
*/
//this works
val df = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")
df.show(false)
df.printSchema()
//fun errors
val df1 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema1)
val df2 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema2)
val df3 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema3)
val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
val df5 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema5)
数据是 kaggle titanic 生存集,标题中的字段大写。我已经尝试将此作为脚本提交给 spark-shell 以及手动在 spark-shell 中运行命令。spark-shell -i 在 dfX 读取时会吐出一些语法错误,如果我手动加载它们看起来不错的任何模式,并且读取都具有相同的错误。
scala> val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
<console>:26: error: overloaded method value apply with alternatives:
(fieldIndex: Int)org.apache.spark.sql.types.StructField <and>
(names: Set[String])org.apache.spark.sql.types.StructType <and>
(name: String)org.apache.spark.sql.types.StructField
cannot be applied to (org.apache.spark.sql.types.StructType)
val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
我不明白我做错了什么。我在 AWS EMR 上使用 Spark 版本 2.4.4。
解决方案
设置inferSchema
参数false
,以便 spark 在加载数据时不会推断架构。
移动你.schema
之前.csv
的 DataFrame 对象将没有schema
功能。
请检查以下代码。
scala> val df1 = spark.read.option("header", true).option("inferSchema", false).schema(schema1).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df1: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
scala> val df2 = spark.read.option("header", true).option("inferSchema", false).schema(schema2).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df2: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
scala> val df3 = spark.read.option("header", true).option("inferSchema", false).schema(schema3).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df3: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
scala> val df4 = spark.read.option("header", true).option("inferSchema", false).schema(schema4).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df4: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
scala> val df5 = spark.read.option("header", true).option("inferSchema", false).schema(schema5).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df5: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]
推荐阅读
- gatsby - Gatsby 的 addThirdPartySchema 问题
- c# - 在 StorageFolder 中创建文件
- css - 如何根据文本长度应用渐变色文本?
- java - 在一行中的两个标签之间替换或重命名字符串并在java中作为字符串返回
- python - 当我在按下按钮 1 后单击按钮 2 时,它不起作用
- python - 我想在 def 中分配新变量
- amazon-web-services - 应用转换错误后 QuickSight 无法生成任何输出列
- bash - 在bash脚本的While循环中比较IF条件中的两个变量
- r - R sf:查找多个重叠多边形之外的多边形
- xcode - Xcode 12 和 Apple Silicon