scala - 使用 Spark Scala 将平面数据转换为嵌套对象
问题描述
我需要使用 Apache Spark / Scala 将平面数据集转换为嵌套格式的帮助。
是否可以自动创建从输入列命名空间派生的嵌套结构
[级别 1]。[2级] ? 在我的示例中,嵌套级别由句点符号“。”确定。在列标题内。
我假设这可以使用地图功能来实现。我对替代解决方案持开放态度,特别是如果有更优雅的方式来实现相同的结果。
package org.acme.au
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import scala.collection.Seq
object testNestedObject extends App {
// Configure spark
val spark = SparkSession.builder()
.appName("Spark batch demo")
.master("local[*]")
.config("spark.driver.host", "localhost")
.getOrCreate()
// Start spark
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
// Define schema for input data
val flatSchema = new StructType()
.add(StructField("id", StringType, false))
.add(StructField("name", StringType, false))
.add(StructField("custom_fields.fav_colour", StringType, true))
.add(StructField("custom_fields.star_sign", StringType, true))
// Create a row with dummy data
val row1 = Row("123456", "John Citizen", "Blue", "Scorpio")
val row2 = Row("990087", "Jane Simth", "Green", "Taurus")
val flatData = Seq(row1, row2)
// Convert into dataframe
val dfIn = spark.createDataFrame(spark.sparkContext.parallelize(flatData), flatSchema)
// Print to console
dfIn.printSchema()
dfIn.show()
// Convert flat data into nested structure as either Parquet or JSON format
val dfOut = dfIn.rdd
.map(
row => ( /* TODO: Need help with mapping flat data to nested structure derived from input column namespaces
*
* For example:
*
* <id>12345<id>
* <name>John Citizen</name>
* <custom_fields>
* <fav_colour>Blue</fav_colour>
* <star_sign>Scorpio</star_sign>
* </custom_fields>
*
*/ ))
// Stop spark
sc.stop()
}
解决方案
这可以通过将输入数据转换为案例类实例的专用case class
和 a来解决。UDF
例如:
定义案例类
case class NestedFields(fav_colour: String, star_sign: String)
定义将原始列值作为输入并返回 的实例的 UDF NestedFields
:
private val asNestedFields = udf((fc: String, ss: String) => NestedFields(fc, ss))
转换原始 DataFrame 并删除扁平列:
val res = dfIn.withColumn("custom_fields", asNestedFields($"`custom_fields.fav_colour`", $"`custom_fields.star_sign`"))
.drop($"`custom_fields.fav_colour`")
.drop($"`custom_fields.star_sign`")
它产生
root
|-- id: string (nullable = false)
|-- name: string (nullable = false)
|-- custom_fields: struct (nullable = true)
| |-- fav_colour: string (nullable = true)
| |-- star_sign: string (nullable = true)
+------+------------+---------------+
| id| name| custom_fields|
+------+------------+---------------+
|123456|John Citizen|[Blue, Scorpio]|
|990087| Jane Simth|[Green, Taurus]|
+------+------------+---------------+
推荐阅读
- typescript - 录音机在 react-native-audio-recorder-player 中不起作用
- mysql - 我将如何制作不高于 5 的双倍?
- regex - 用于逗号或/和空格分隔的正则表达式
- session-cookies - Katalon 无法发送请求
- python - 在 Python 中使用 if 语句进行字符串检查
- android - 如何确保回收站视图适配器项目占据整个屏幕高度
- r - 将 ShinyWebApp 部署到 rstudio-connect 期间的问题
- sdn - 是否可以从 ODL 控制器更改 Mininet 拓扑中 OVS 交换机的配置?
- python - 我可以在窗口上获取注册表值的修改时间吗?
- reactjs - 每次打开应用程序时,如何在本机反应中创建自动背景图像更改?