首页 > 解决方案 > 使用 Spark Scala 将平面数据转换为嵌套对象

问题描述

我需要使用 Apache Spark / Scala 将平面数据集转换为嵌套格式的帮助。

是否可以自动创建从输入列命名空间派生的嵌套结构

[级别 1][2级] ? 在我的示例中,嵌套级别由句点符号“。”确定。在列标题内。

我假设这可以使用地图功能来实现。我对替代解决方案持开放态度,特别是如果有更优雅的方式来实现相同的结果。

package org.acme.au

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import scala.collection.Seq

object testNestedObject extends App {

  // Configure spark
  val spark = SparkSession.builder()
    .appName("Spark batch demo")
    .master("local[*]")
    .config("spark.driver.host", "localhost")
    .getOrCreate()

  // Start spark
  val sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  val sqlContext = new SQLContext(sc)

  // Define schema for input data
  val flatSchema = new StructType()
    .add(StructField("id", StringType, false))
    .add(StructField("name", StringType, false))
    .add(StructField("custom_fields.fav_colour", StringType, true))
    .add(StructField("custom_fields.star_sign", StringType, true))

  // Create a row with dummy data
  val row1 = Row("123456", "John Citizen", "Blue", "Scorpio")
  val row2 = Row("990087", "Jane Simth", "Green", "Taurus")
  val flatData = Seq(row1, row2)

  // Convert into dataframe
  val dfIn = spark.createDataFrame(spark.sparkContext.parallelize(flatData), flatSchema)

  // Print to console
  dfIn.printSchema()
  dfIn.show()

  // Convert flat data into nested structure as either Parquet or JSON format
  val dfOut = dfIn.rdd
    .map(
      row => ( /* TODO: Need help with mapping flat data to nested structure derived from input column namespaces
           * 
           * For example:
           * 
           * <id>12345<id>
           * <name>John Citizen</name>
           * <custom_fields>
           *   <fav_colour>Blue</fav_colour>
           *   <star_sign>Scorpio</star_sign>
           * </custom_fields>
           * 
           */ ))

  // Stop spark
  sc.stop()

}

标签: scalaapache-sparkapache-spark-sql

解决方案


这可以通过将输入数据转换为案例类实例的专用case class和 a来解决。UDF例如:

定义案例类

case class NestedFields(fav_colour: String, star_sign: String)

定义将原始列值作为输入并返回 的实例的 UDF NestedFields

private val asNestedFields = udf((fc: String, ss: String) => NestedFields(fc, ss))

转换原始 DataFrame 并删除扁平列:

val res = dfIn.withColumn("custom_fields", asNestedFields($"`custom_fields.fav_colour`", $"`custom_fields.star_sign`"))
              .drop($"`custom_fields.fav_colour`")
              .drop($"`custom_fields.star_sign`")

它产生

root
|-- id: string (nullable = false)
|-- name: string (nullable = false)
|-- custom_fields: struct (nullable = true)
|    |-- fav_colour: string (nullable = true)
|    |-- star_sign: string (nullable = true)

+------+------------+---------------+
|    id|        name|  custom_fields|
+------+------------+---------------+
|123456|John Citizen|[Blue, Scorpio]|
|990087|  Jane Simth|[Green, Taurus]|
+------+------------+---------------+

推荐阅读