首页 > 解决方案 > 将 2D 结构数组取消嵌套到 2D 数组结构中

问题描述

我有一列类型array<array<struct<a: String, b: Int>>>

我想要一列类型struct<a: array<array<String>>, b: array<array<Int>>

理想情况下,此过程应自动取消所有结构字段的嵌套(即无需我手动指定字段“a”和“b”),但任何有效的方法在这里都将非常有帮助。

我拥有的示例代码(我正在尝试ds变成expected)。

case class Struct(foo: String, bar: Int)
case class Schema(structs: Vector[Vector[Struct]])

val ss = spark
import ss.implicits._

val ds = Seq(Schema(Vector(Vector(Struct("a", 1), Struct("b", 2)), Vector(Struct("c", 3))))).toDS

val expected = Seq(
    (Vector(Vector("a", "b"), Vector("c")), Vector(Vector(1, 2), Vector(3)))
).toDF("foo", "bar")

标签: apache-spark

解决方案


最短的解决方案是使用transform高阶函数(在 Spark 2.4 中引入):

ds.selectExpr(
  "transform(structs, xs -> transform(xs, x -> x.foo)) as foo",
  "transform(structs, xs -> transform(xs, x -> x.bar)) as bar"
)

在旧版本中,您需要等效的udf* 或使用 typed map

ds.as[Schema]
  .map(x => (
    x.structs.map(_.map(_.foo)), 
    x.structs.map(_.map(_.bar))
)).toDF("foo", "bar")

前一种解决方案可以概括为:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.DataFrame

def expand(ds: DataFrame, col: String) = {

  val fields = ds.schema(col).dataType match {
    case ArrayType(ArrayType(s: StructType, _), _) => s.fieldNames
  }
  val exprs = fields.map {
    field => expr(
      s"transform(`$col`, xs -> transform(xs, x -> x.`$field`)) as `$field`"
    )
  }
  ds.select(exprs: _*)
}

expand(ds.toDF, "structs")

后者可能不是那么多,除非你想使用 Scala 反射(这是一个严重的矫枉过正)。


* 这些行周围的东西应该可以解决问题:

import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.functions.udf

def extract[T : TypeTag](field: String) = udf(
  (xs: Seq[Seq[Row]]) => xs.map(_.map(_.getAs[T](field)))
)

val extractString = extract[String] _
val extractInt = extract[Int] _

ds.select(
  extractString("foo")($"structs").as("foo"),
  extractInt("bar")($"structs").as("bar")
)

推荐阅读