首页 > 解决方案 > 从现有 DataFrame 创建嵌套数组 DataFrame

问题描述

我正在尝试在 scala 中的“加入”操作期间从数据帧创建嵌套的结构数组列。我似乎能够开始工作的唯一一件事是设置一个元素结构数组,它看起来没有写在 json 输出中。

我开始的当前模式是:

root
 |-- memberId: integer (nullable = false)
 |-- memberSubscriberId: integer (nullable = false)
 |-- memberIdSuffix: integer (nullable = false)
 |-- memberLastName: string (nullable = false)
 |-- memberFirstName: string (nullable = false)
 |-- memberMiddleInitial: string (nullable = false)
 |-- memberSocialSecurityNumber: string (nullable = false)
 |-- memberGender: string (nullable = false)
 |-- memberBirthDate: timestamp (nullable = false)
 |-- memberworkphonenumber: string (nullable = false)
 |-- memberworkphoneextensionnumber: string (nullable = false)
 |-- membercellphone: string (nullable = false)

root
 |-- memberSubscriberId: integer (nullable = false)
 |-- subscriberaddresstypecode: string (nullable = false)
 |-- lineOne: string (nullable = false)
 |-- lineTwo: string (nullable = false)
 |-- lineThree: string (nullable = false)
 |-- cityName: string (nullable = false)
 |-- stateCode: string (nullable = false)
 |-- zipCode: string (nullable = false)
 |-- countyCode: string (nullable = false)
 |-- countryCode: string (nullable = false)
 |-- subscriberphonenumber: string (nullable = false)
 |-- subscriberphoneextensionnumber: string (nullable = false)
 |-- subscriberfaxnumber: string (nullable = false)
 |-- subscriberfaxextensionnumber: string (nullable = false)
 |-- address: string (nullable = false)

去我认为:

root
 |-- memberSubscriberId: integer (nullable = false)
 |-- memberId: integer (nullable = false)
 |-- memberIdSuffix: integer (nullable = false)
 |-- memberLastName: string (nullable = false)
 |-- memberFirstName: string (nullable = false)
 |-- memberMiddleInitial: string (nullable = false)
 |-- memberSocialSecurityNumber: string (nullable = false)
 |-- memberGender: string (nullable = false)
 |-- memberBirthDate: timestamp (nullable = false)
 |-- memberworkphonenumber: string (nullable = false)
 |-- memberworkphoneextensionnumber: string (nullable = false)
 |-- membercellphone: string (nullable = false)
 |-- memberAddresses: array (nullable = false)
 |    |-- lineOne: string (nullable = false)
 |    |-- lineTwo: string (nullable = false)
 |    |-- lineThree: string (nullable = false)
 |    |-- cityName: string (nullable = false)
 |    |-- stateCode: string (nullable = false)
 |    |-- zipCode: string (nullable = false)
 |    |-- countyCode: string (nullable = false)
 |    |-- countryCode: string (nullable = false)
 |-- memeberPhoneNumbers: array (nullable = false)
 |    |-- phoneNumber: string (nullable = false)
 |    |-- effectiveDate: null (nullable = true)
 |    |-- terminationDate: null (nullable = true)
 |    |-- isCurrent: null (nullable = true)
 |    |-- isActive: null (nullable = true)
 |    |-- telecomType: string (nullable = false)

当前代码:

val clientDF: DataFrame
val addrDF: DataFrame

import spark.implicits._

    val nestedAddr = addrDF.select(
      $"clientSubscriberId",
      array(
        struct(
          $"lineOne",
          $"lineTwo",
          $"lineThree",
          $"cityName",
          $"stateCode",
          $"zipCode",
          $"countyCode",
          $"countryCode"
        )
      ).as("clientAddresses"),
      array(
        struct(
          $"subscriberphonenumber".alias("phoneNumber"),
          //$"subscriberphoneextensionnumber"
          lit(null).alias("effectiveDate"),
          lit(null).alias("terminationDate"),
          lit(null).alias("isCurrent"),
          lit(null).alias("isActive"),
          lit("home").alias("telecomType")
        ),
        struct(
          $"subscriberfaxnumber".alias("phoneNumber"),
          //$"subscriberfaxextensionnumber".map(c => col(c).as("phoneNumber"))
          lit(null).alias("effectiveDate"),
          lit(null).alias("terminationDate"),
          lit(null).alias("isCurrent"),
          lit(null).alias("isActive"),
          lit("fax").alias("telecomType")
        )
      ).as("memeberPhoneNumbers")
    )
    val addrMbrDF = mbrDF.join(nestedAddr, Seq("clientSubscriberId"))

生成的架构:

root
 |-- memberSubscriberId: integer (nullable = false)
 |-- memberId: integer (nullable = false)
 |-- memberIdSuffix: integer (nullable = false)
 |-- memberLastName: string (nullable = false)
 |-- memberFirstName: string (nullable = false)
 |-- memberMiddleInitial: string (nullable = false)
 |-- memberSocialSecurityNumber: string (nullable = false)
 |-- memberGender: string (nullable = false)
 |-- memberBirthDate: timestamp (nullable = false)
 |-- memberworkphonenumber: string (nullable = false)
 |-- memberworkphoneextensionnumber: string (nullable = false)
 |-- membercellphone: string (nullable = false)
 |-- memberAddresses: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- lineOne: string (nullable = false)
 |    |    |-- lineTwo: string (nullable = false)
 |    |    |-- lineThree: string (nullable = false)
 |    |    |-- cityName: string (nullable = false)
 |    |    |-- stateCode: string (nullable = false)
 |    |    |-- zipCode: string (nullable = false)
 |    |    |-- countyCode: string (nullable = false)
 |    |    |-- countryCode: string (nullable = false)
 |-- memeberPhoneNumbers: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- phoneNumber: string (nullable = false)
 |    |    |-- effectiveDate: null (nullable = true)
 |    |    |-- terminationDate: null (nullable = true)
 |    |    |-- isCurrent: null (nullable = true)
 |    |    |-- isActive: null (nullable = true)
 |    |    |-- telecomType: string (nullable = false)


Expected schema:
root
 |-- memberSubscriberId: integer (nullable = false)
 |-- memberId: integer (nullable = false)
 |-- memberIdSuffix: integer (nullable = false)
 |-- memberLastName: string (nullable = false)
 |-- memberFirstName: string (nullable = false)
 |-- memberMiddleInitial: string (nullable = false)
 |-- memberSocialSecurityNumber: string (nullable = false)
 |-- memberGender: string (nullable = false)
 |-- memberBirthDate: timestamp (nullable = false)
 |-- memberworkphonenumber: string (nullable = false)
 |-- memberworkphoneextensionnumber: string (nullable = false)
 |-- membercellphone: string (nullable = false)
 |-- memberAddresses: array (nullable = false)
 |    |-- lineOne: string (nullable = false)
 |    |-- lineTwo: string (nullable = false)
 |    |-- lineThree: string (nullable = false)
 |    |-- cityName: string (nullable = false)
 |    |-- stateCode: string (nullable = false)
 |    |-- zipCode: string (nullable = false)
 |    |-- countyCode: string (nullable = false)
 |    |-- countryCode: string (nullable = false)
 |-- memeberPhoneNumbers: array (nullable = false)
 |    |-- phoneNumber: string (nullable = false)
 |    |-- effectiveDate: null (nullable = true)
 |    |-- terminationDate: null (nullable = true)
 |    |-- isCurrent: null (nullable = true)
 |    |-- isActive: null (nullable = true)
 |    |-- telecomType: string (nullable = false)

我已经尝试了多种不同的方法来让它工作:

      ).as("clientAddresses"),
      array(
        struct(
      ).as("clientAddresses"),
       struct(
      ).as("clientAddresses"),
      array(
      ).as("clientAddresses"),
      collect_list(
        struct(

标签: scalaapache-sparkapache-spark-sql

解决方案


简单地说,您想要的预期模式是不可能的。我的意思是,当你有一个数组时,它总是包含一个element给定模式的,在你的例子中是一个结构。所以我实际上会说你得到的模式正是你想要实现的。


推荐阅读