apache-spark - 在 spark scala 中动态创建 CSV 文件头
问题描述
我们从供应商处收到 2 个文件(数据文件和元数据文件)用于数据摄取。
Vendor 1
data file format
user_id has_insurance postal_code city
101 Y 20001 Newyork
102 N 40001 Boston
metadata file format
user_id,String
has_insurance,Boolean
postal_code,String
city, String
我们将从另一个供应商处收到相同的数据字段,但数据文件中的字段顺序可能会有所不同,如下所示
Vendor 2
data file format
user_id postal_code city has_insurance
101 20001 Newyork Y
102 40001 Boston N
metadata file format
user_id,String
postal_code,String
city, String
has_insurance,Boolean
元数据文件将包含字段顺序。读取 CSV 文件时是否可以根据元数据文件动态分配模式?
//函数为给定的字段数据类型派生火花数据类型
def strToDataType(str: String): DataType = {
| if (str == "String") StringType
| else if (str == "Boolean") BooleanType
| else StringType
| }
val metadataDf = spark.sqlContext.textFile("metadata_folder")
val headerSchema = StructType(metadataDf.map(_.split(",")).map(x => StructField(x(0),strToDataType(x(1)),true)))
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.schema(headerSchema) // defining based on the custom schema
.load("data_file.csv")
val headerSchema =
StructType(bxfd.map(_.split(",")).map(x => StructField(x(0),strToDataType(x(1)),true)))
当我尝试使用上述命令动态创建架构时,出现以下错误。您能否提一些建议。
<console>:34: error: overloaded method value apply with alternatives:
(fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.types.StructField])
解决方案
在这种情况下你不能使用 CSV 数据源,但是你可以自己解析它,没有太大的困难。基本步骤是将元数据文件作为一个文本块读取,然后数据文件将数据行与标题行分开并应用架构。
我对你的分隔符做了一些假设。要记住的一件事是,一旦加载 DataFrame 中的行,您不应该对其进行任何排序,因为您不知道哪些数据正在加载到哪个 worker 上。这会尝试根据内容识别数据行和元数据行,因此您可能需要调整这些规则。
def loadVendor(dataPath: String, metaPath: String): DataFrame = {
val df = spark.read.text(path)
// First read the metadata file, wholeTextFiles lets us get it all
// as a single string we so we can parse locally
val metaText = sc.wholeTextFiles(metaPath).first._2
val metaLines = metaText.
split("\n").
map(_.split(","))
// Identify header as line that has all the field names
val fields = Seq("user_id", "has_insurance", "postal_code", "city")
val headerDf = fields.foldLeft(df)((df2, fname) => df2.filter($"value".contains(fname)))
val headerLine = headerDf.first.getString(0)
val header = headerLine.split(" ")
// Identify data rows as ones that aren't special lines or the header
var data = df.
filter($"value" =!= headerLine).
filter(!$"value".startsWith("Vendor 1")).
filter(!$"value".startsWith("data file format"))
// Split the data fields on separator and assign column names. Assumed any whitespace is your separator
val rows = data.select(split($"value", raw"\W+") as "fields")
val named = header.zipWithIndex.map( { case (f, idx) => $"fields".getItem(idx).alias(f)} )
val table = rows.select(named:_*)
// Cast to the right types
val castCols = metaLines.map { case Array(cname, ctype) => col(cname).cast(ctype) }
val typed = table.select(castCols:_*)
// Return the columns sorted by name, so union'ing multiple DF's will line up
typed.select(table.columns.sorted.map(col):_*)
}
这是打印的数据
scala> df.show
+-------+-------------+-----------+-------+
| city|has_insurance|postal_code|user_id|
+-------+-------------+-----------+-------+
|Newyork| true| 20001| 101|
| Boston| false| 40001| 102|
+-------+-------------+-----------+-------+
这是打印的架构
scala> df.printSchema
root
|-- city: string (nullable = true)
|-- has_insurance: boolean (nullable = true)
|-- postal_code: string (nullable = true)
|-- user_id: string (nullable = true)
推荐阅读
- java - 确保我的 servlet 过滤器在其他动态添加的过滤器之前运行
- python - Python 绘制缺失数据
- angular - 如何测试包含自定义表单控件的组件?
- java - 是否可以在运行时从 String 执行代码?
- sql-server - MS SQL Server 慢速网络性能 (ASYNC_NETWORK_IO) 与快速网络 10Gbit
- r - 零膨胀高斯分布模型
- jhipster - jhipster 错误 403 请求中未包含有效的面包屑
- php - 我想使用 PHP 7.1.5 从 MySql 数据库中获取任何特定字段,但它不起作用
- python - 基于字典键合并列表
- c# - ASP.NET Core 在构建过程中设置托管环境