scala - 如何使用具有复杂字段的自定义模式有效地触发读取源数据?
问题描述
假设我有以下由逗号分隔的原始源数据,,
但有一些X
字段具有非常自定义的格式。为简单起见,我已将此示例最小化为 3 个字段/列。在这种情况下,自定义字段是address
具有特殊格式的(用大括号括起来的键/值)。可能还有其他格式完全不同的字段。
Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
...
Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]
案例类:
case class Person(name: String, age: Int, address: Address)
case class Address(street: String, city: String, state: String, zip: Int)
将源数据(包括地址字段的解析)处理成最有效的方法是什么Dataset[Person]
?
目前,有两种选择:
选项 1 - 执行逐行手动转换:
val df = df.read.csv(source)
val dataset = df.map(row =>
Person(row.getString("_c0"), row.getInt("_c1"), getAddress(row.getString("_c3")))
).as[Person]
选项 2 -UDF
为自定义格式的列使用(用户定义的函数)并使用withColumn
andwithColumnRenamed
:
val udfAddress : UserDefinedFunction = udf((address: String) => toAddressObject(address))
var df = df.read.csv(source)
df = df.withColumnRenamed("_c0", "name").withColumn("name", col("name").cast(StringType))
.withColumnRenamed("_c1", "age").withColumn("age", col("age").cast(IntegerType))
.withColumnRenamed("_c2", "address").withColumn("address", udfAddress(col("address")))
val dataset = df.as[Person]
一般来说,在选项 1和选项 2之间,什么更有效,为什么?此外,如果有另一个选项在处理/解析自定义格式字段方面更有效,我也愿意接受其他选项。是否有更好的选择涉及手动组合 StructType 和 StructFields?谢谢!
解决方案
一种选择可能是-
请注意,我没有进行任何性能测试
加载测试数据
val data =
"""
|Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
|Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\,").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString("|"))
.toSeq.toDS()
val df = spark.read
.option("sep", "|")
.option("inferSchema", "true")
// .option("header", "true")
// .option("nullValue", "null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +-----+---+----------------------------------------------------+
* |_c0 |_c1|_c2 |
* +-----+---+----------------------------------------------------+
* |Bob |35 |[street:75917;city:new york city;state:ny;zip:10000]|
* |Roger|75 |[street:81659;city:los angeles;state:ca;zip:99999] |
* +-----+---+----------------------------------------------------+
*
* root
* |-- _c0: string (nullable = true)
* |-- _c1: integer (nullable = true)
* |-- _c2: string (nullable = true)
*/
将行的数据框转换为人
val person = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
val toAddr = udf((map: Map[String, String]) => Address(map("street"), map("city"), map("state"), map("zip").toInt))
val p = df.withColumn("_c2", translate($"_c2", "[]",""))
.withColumn("_c2", expr("str_to_map(_c2, ';', ':')"))
.withColumn("_c2", toAddr($"_c2"))
.toDF(person.map(_.name): _*)
.as[Person]
p.show(false)
p.printSchema()
/**
* +-----+---+---------------------------------+
* |name |age|address |
* +-----+---+---------------------------------+
* |Bob |35 |[75917, new york city, ny, 10000]|
* |Roger|75 |[81659, los angeles, ca, 99999] |
* +-----+---+---------------------------------+
*
* root
* |-- name: string (nullable = true)
* |-- age: integer (nullable = true)
* |-- address: struct (nullable = true)
* | |-- street: string (nullable = true)
* | |-- city: string (nullable = true)
* | |-- state: string (nullable = true)
* | |-- zip: integer (nullable = false)
*/
推荐阅读
- sql-server - 一个 SQL Server 查询,将每个人的养老基金的价值增加一个月的缴款价值
- elasticsearch - 弹性搜索如何分析带有“-”的 URL/单词
- android - Android:FileObserver 不会在某些手机上触发 OPEN 事件
- windows - Windows CMD:如何使用模式递归删除文件夹
- java - Apache Camel:引号会破坏 URI
- android - Android 社交网络帐户共享
- outlook-addin - Outlook 加载项按钮在 20 秒后出现在功能区上
- java - 无法从大小为 2 的集合中获取索引为 2 的元素
- python - Python Subprocess 有效地检查子进程是否终止
- php - 如何使用函数 yii2 创建 url