首页 > 解决方案 > 如何使用具有复杂字段的自定义模式有效地触发读取源数据?

问题描述

假设我有以下由逗号分隔的原始源数据,,但有一些X字段具有非常自定义的格式。为简单起见,我已将此示例最小化为 3 个字段/列。在这种情况下,自定义字段是address具有特殊格式的(用大括号括起来的键/值)。可能还有其他格式完全不同的字段。

Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
...
Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]

案例类:

case class Person(name: String, age: Int, address: Address)
case class Address(street: String, city: String, state: String, zip: Int)

将源数据(包括地址字段的解析)处理成最有效的方法是什么Dataset[Person]

目前,有两种选择:

选项 1 - 执行逐行手动转换:

val df = df.read.csv(source)
val dataset = df.map(row => 
    Person(row.getString("_c0"), row.getInt("_c1"), getAddress(row.getString("_c3")))
).as[Person]

选项 2 -UDF为自定义格式的列使用(用户定义的函数)并使用withColumnandwithColumnRenamed

val udfAddress : UserDefinedFunction = udf((address: String) => toAddressObject(address))
var df = df.read.csv(source)
df = df.withColumnRenamed("_c0", "name").withColumn("name", col("name").cast(StringType))
       .withColumnRenamed("_c1", "age").withColumn("age", col("age").cast(IntegerType))
       .withColumnRenamed("_c2",  "address").withColumn("address", udfAddress(col("address")))
val dataset = df.as[Person]

一般来说,在选项 1选项 2之间,什么更有效,为什么?此外,如果有另一个选项在处理/解析自定义格式字段方面更有效,我也愿意接受其他选项。是否有更好的选择涉及手动组合 StructType 和 StructFields?谢谢!

标签: scalaapache-sparkapache-spark-sqldistributed-computingapache-spark-dataset

解决方案


一种选择可能是-

请注意,我没有进行任何性能测试

加载测试数据

 val data =
      """
        |Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
        |Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]
      """.stripMargin

    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\,").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString("|"))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", "|")
      .option("inferSchema", "true")
//      .option("header", "true")
      //      .option("nullValue", "null")
      .csv(stringDS)
    df.show(false)
    df.printSchema()
    /**
      * +-----+---+----------------------------------------------------+
      * |_c0  |_c1|_c2                                                 |
      * +-----+---+----------------------------------------------------+
      * |Bob  |35 |[street:75917;city:new york city;state:ny;zip:10000]|
      * |Roger|75 |[street:81659;city:los angeles;state:ca;zip:99999]  |
      * +-----+---+----------------------------------------------------+
      *
      * root
      * |-- _c0: string (nullable = true)
      * |-- _c1: integer (nullable = true)
      * |-- _c2: string (nullable = true)
      */

将行的数据框转换为人

val person = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
    val toAddr = udf((map: Map[String, String]) => Address(map("street"), map("city"), map("state"), map("zip").toInt))
    val p = df.withColumn("_c2", translate($"_c2", "[]",""))
      .withColumn("_c2", expr("str_to_map(_c2, ';', ':')"))
      .withColumn("_c2", toAddr($"_c2"))
      .toDF(person.map(_.name): _*)
      .as[Person]


    p.show(false)
    p.printSchema()

    /**
      * +-----+---+---------------------------------+
      * |name |age|address                          |
      * +-----+---+---------------------------------+
      * |Bob  |35 |[75917, new york city, ny, 10000]|
      * |Roger|75 |[81659, los angeles, ca, 99999]  |
      * +-----+---+---------------------------------+
      *
      * root
      * |-- name: string (nullable = true)
      * |-- age: integer (nullable = true)
      * |-- address: struct (nullable = true)
      * |    |-- street: string (nullable = true)
      * |    |-- city: string (nullable = true)
      * |    |-- state: string (nullable = true)
      * |    |-- zip: integer (nullable = false)
      */

推荐阅读