首页 > 解决方案 > 在 spark scala 中动态创建 CSV 文件头

问题描述

我们从供应商处收到 2 个文件(数据文件和元数据文件)用于数据摄取。

Vendor 1 
data file format
user_id has_insurance postal_code city
101        Y        20001       Newyork
102        N        40001       Boston
metadata file format
user_id,String
has_insurance,Boolean
postal_code,String
city, String

我们将从另一个供应商处收到相同的数据字段,但数据文件中的字段顺序可能会有所不同,如下所示

Vendor 2
data file format
user_id  postal_code   city     has_insurance
101        20001       Newyork  Y
102        40001       Boston   N
metadata file format
user_id,String
postal_code,String
city, String
has_insurance,Boolean

元数据文件将包含字段顺序。读取 CSV 文件时是否可以根据元数据文件动态分配模式?

//函数为给定的字段数据类型派生火花数据类型

def strToDataType(str: String): DataType = {
     | if (str == "String") StringType
     | else if (str == "Boolean") BooleanType
     | else StringType
     | }

val metadataDf = spark.sqlContext.textFile("metadata_folder")
val headerSchema = StructType(metadataDf.map(_.split(",")).map(x => StructField(x(0),strToDataType(x(1)),true)))

 val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .schema(headerSchema) // defining based on the custom schema
    .load("data_file.csv") 


val headerSchema = 
StructType(bxfd.map(_.split(",")).map(x => StructField(x(0),strToDataType(x(1)),true)))

当我尝试使用上述命令动态创建架构时,出现以下错误。您能否提一些建议。

<console>:34: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.types.StructField])

标签: apache-spark

解决方案


在这种情况下你不能使用 CSV 数据源,但是你可以自己解析它,没有太大的困难。基本步骤是将元数据文件作为一个文本块读取,然后数据文件将数据行与标题行分开并应用架构。

我对你的分隔符做了一些假设。要记住的一件事是,一旦加载 DataFrame 中的行,您不应该对其进行任何排序,因为您不知道哪些数据正在加载到哪个 worker 上。这会尝试根据内容识别数据行和元数据行,因此您可能需要调整这些规则。

def loadVendor(dataPath: String, metaPath: String): DataFrame = {
    val df = spark.read.text(path)

    // First read the metadata file, wholeTextFiles lets us get it all
    // as a single string we so we can parse locally
    val metaText = sc.wholeTextFiles(metaPath).first._2
    val metaLines = metaText.
        split("\n").
        map(_.split(","))

    // Identify header as line that has all the field names
    val fields = Seq("user_id", "has_insurance", "postal_code", "city")
    val headerDf = fields.foldLeft(df)((df2, fname) => df2.filter($"value".contains(fname)))
    val headerLine = headerDf.first.getString(0)
    val header = headerLine.split(" ")

    // Identify data rows as ones that aren't special lines or the header
    var data = df.
        filter($"value" =!= headerLine).
        filter(!$"value".startsWith("Vendor 1")).
        filter(!$"value".startsWith("data file format"))

    // Split the data fields on separator and assign column names. Assumed any whitespace is your separator
    val rows = data.select(split($"value", raw"\W+") as "fields")

    val named = header.zipWithIndex.map( { case (f, idx) => $"fields".getItem(idx).alias(f)} )
    val table = rows.select(named:_*)

    // Cast to the right types
    val castCols = metaLines.map { case Array(cname, ctype) => col(cname).cast(ctype) }
    val typed = table.select(castCols:_*)

    // Return the columns sorted by name, so union'ing multiple DF's will line up
    typed.select(table.columns.sorted.map(col):_*)
}

这是打印的数据

scala> df.show
+-------+-------------+-----------+-------+
|   city|has_insurance|postal_code|user_id|
+-------+-------------+-----------+-------+
|Newyork|         true|      20001|    101|
| Boston|        false|      40001|    102|
+-------+-------------+-----------+-------+

这是打印的架构

scala> df.printSchema
root
 |-- city: string (nullable = true)
 |-- has_insurance: boolean (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- user_id: string (nullable = true)

推荐阅读