首页 > 解决方案 > spark2 sql深度嵌套数组结构与镶木地板

问题描述

给定一个像这样的深度嵌套的镶木地板结构

|-- bet: struct (nullable = true)
|    |-- sides: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- side: string (nullable = true)
|    |    |    |-- betID: string (nullable = true)
|    |    |    |-- secondarybetID: string (nullable = true)
|    |    |    |-- parties: struct (nullable = true)
|    |    |    |    |-- partyIDs: array (nullable = true)
|    |    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |    |-- partyID: string (nullable = true)
|    |    |    |    |    |    |-- partyRole: integer (nullable = true)
|    |    |    |    |    |    |-- partySubGrp: struct (nullable = true)
|    |    |    |    |    |    |    |-- partySubIDs: array (nullable = true)
|    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |    |    |    |    |-- partySubID: string (nullable = true)
|    |    |    |    |    |    |    |    |    |-- partySubIDType: integer (nullable = true)

考虑到一个赌注有几个方面,不知何故,我们只对边数组中的第一边感兴趣。如何找到partyRole 为 10 的一方的参与方?

在 prestosql 我可以做类似的事情

 SELECT 
        filter(bet.sides[1].parties.partyids, x -> x.partyrole=10)[1] as party10
    FROM 
        parquetbets
    WHERE 
        cardinality(filter(bet.sides[1].parties.partyids, x -> x.partyrole=10))>0

我该如何在 spark2 sql 中做同样的事情?

 SELECT bet.sides[1] from parquetbets 

在 spark2 sql 中,上面返回一个数组,在嵌套结构上没有进一步修剪的范围?

IE

 SELECT bet.sides[1].parties from parquetbets

返回空值。我尝试了一些组合,但结果返回 WrappedArrayElements,它不提供查询嵌套数据的机制。在 prestosql 返回的结果中包含字段名称,因此很容易继续深入研究结构。

有人可以指出 spark2 sql 如何支持这一点吗?如果 spark2 sql 不能,那么 spark 数据帧如何做到这一点?

标签: apache-sparkapache-spark-sqlparquettrinospark2

解决方案


愚蠢的问题:您是否考虑过将 DataSet API 与编码器一起使用?它提供了一个功能 API 来推理您的问题(这是一种更容易解决功能的方法)。

否则考虑爆炸你的数组以对扁平数据进行推理(参见 org.apache.spark.sql.functions.explode)。

scala 中的示例:

  case class PartyId(partyID: String, partyRole: Int)
  case class Party(partyIDs: Seq[PartyId])
  case class Side(side: String, betId: String, parties: Party)
  case class Bet(sides: Seq[Side])

  import spark.implicits._
  val ds = spark.read.load("my-bets.parquet").as[Bet]

  val firstSidesDS = ds.flatMap(_.sides.headOption) //take the first side if exists

  val result: Dataset[Side] = firstSidesDS.filter(_.parties.partyIDs.exists(_.partyRole == 10)) //Here I return sides for which there is at least a partyRole = 10

推荐阅读