首页 > 解决方案 > 如何根据必填字段列表细化 Spark StructType Schema?

问题描述

我正在尝试从现有架构创建 StructType 架构。我有一个列表,其中包含新架构所需的字段。困难的部分是架构是嵌套的 json 数据,其中包含 ArrayType(StructType) 等复杂字段。这是模式的代码,

val schema1: Seq[StructField] = Seq(
      StructField("playerId", StringType, true),
      StructField("playerName", StringType, true),
      StructField("playerCountry", StringType, true),
      StructField("playerBloodType", StringType, true)
    )

    val schema2: Seq[StructField] =
      Seq(
        StructField("PlayerHistory", ArrayType(
          StructType(
            Seq(
              StructField("Rating", StringType, true),
              StructField("Height", StringType, true),
              StructField("Weight", StringType, true),
              StructField("CoachDetails",
                StructType(
                  Seq(
                    StructField("CoachName", StringType, true),
                    StructField("Address",
                      StructType(
                        Seq(
                          StructField("AddressLine1", StringType, true),
                          StructField("AddressLine2", StringType, true),
                          StructField("CoachCity", StringType, true))), true),
                    StructField("Suffix", StringType, true))), true),
              StructField("GoalHistory", ArrayType(
                StructType(
                  Seq(
                    StructField("MatchDate", StringType, true),
                    StructField("NumberofGoals", StringType, true),
                    StructField("SubstitutionIndicator", StringType, true))), true), true),
              StructField("receive_date", DateType, true))
          ), true
        )))


    val requiredFields = List("playerId", "playerName", "Rating", "CoachName", "CoachCity", "MatchDate", "NumberofGoals")

    val schema: StructType = StructType(schema1 ++ schema2)

变量schema是当前 schema,requiredFields包含我们需要的新 schema 的字段。我们还需要新模式中的父块。 输出模式应如下所示:

    val outputSchema =
      Seq(
        StructField("playerId", StringType, true),
        StructField("playerName", StringType, true),
        StructField("PlayerHistory",
          ArrayType(StructType(
            StructField("Rating", StringType, true),
            StructField("CoachDetails",
              StructType(
                StructField("CoachName", StringType, true),
                StructField("Address", StructType(
                  StructField("CoachCity", StringType, true)), true),
                StructField("GoalHistory", ArrayType(
                  StructType(
                    StructField("MatchDate", StringType, true),
                    StructField("NumberofGoals", StringType, true)), true), true)))

我尝试使用以下代码以递归方式解决问题。

schema.fields.map(f => filterSchema(f, requiredFields)).filter(_.name != "")

  def filterSchema(field: StructField, requiredColumns: Seq[String]): StructField = {
    field match{
      case StructField(_, inner : StructType, _ ,_) => StructField(field.name,StructType(inner.fields.map(f => filterSchema(f, requiredColumns))))
      case StructField(_, ArrayType(structType: StructType, _),_,_) => 
        if(requiredColumns.contains(field.name))
          StructField(field.name, ArrayType(StructType(structType.fields.map(f => filterSchema(f,requiredColumns))),true), true)
        else
          StructField("",StringType,true)
      case StructField(_, _, _, _) => if(requiredColumns.contains(field.name)) field else StructField("",StringType,true)
    }
  }

 

但是,我无法过滤掉内部结构字段。

感觉可以对递归函数的基本条件进行一些修改。这里的任何帮助将不胜感激。提前致谢。

标签: scalaapache-sparkpysparkapache-spark-sql

解决方案


这是我的做法,

class SchemaRefiner(schema: StructType, requiredColumns: Seq[String]) {
  var FINALSCHEMA: Array[StructField] = Array[StructField]()

  private def refine(schematoRefine: StructType, requiredColumns: Seq[String]): Unit = {
    schematoRefine.foreach(f => {
      if (requiredColumns.contains(f.name)) {
        f match {
          case StructField(_, inner: StructType, _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ f
          case StructField(_, inner: StructType, _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ StructField(f.name, StructType(new SchemaRefiner(inner, requiredColumns).getRefinedSchema), true)
          case StructField(_, ArrayType(structType: StructType, _), _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ StructField(f.name, ArrayType(StructType(new SchemaRefiner(structType, requiredColumns).getRefinedSchema)), true)
          case StructField(_, _, , _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ f
        }
      }
    })
  }

  def getRefinedSchema: Array[StructField] = {
    refine(schema, requiredColumns)
    this.FINALSCHEMA
  }
}

这将遍历 structfields,每次遇到新的 Structtype 时,都会递归调用该函数以获取新的 Structype。

val fields = new SchemaRefiner(schema,requiredFields)
val newSchema = fields.getRefinedSchema

推荐阅读