scala - 如何根据必填字段列表细化 Spark StructType Schema?
问题描述
我正在尝试从现有架构创建 StructType 架构。我有一个列表,其中包含新架构所需的字段。困难的部分是架构是嵌套的 json 数据,其中包含 ArrayType(StructType) 等复杂字段。这是模式的代码,
val schema1: Seq[StructField] = Seq(
StructField("playerId", StringType, true),
StructField("playerName", StringType, true),
StructField("playerCountry", StringType, true),
StructField("playerBloodType", StringType, true)
)
val schema2: Seq[StructField] =
Seq(
StructField("PlayerHistory", ArrayType(
StructType(
Seq(
StructField("Rating", StringType, true),
StructField("Height", StringType, true),
StructField("Weight", StringType, true),
StructField("CoachDetails",
StructType(
Seq(
StructField("CoachName", StringType, true),
StructField("Address",
StructType(
Seq(
StructField("AddressLine1", StringType, true),
StructField("AddressLine2", StringType, true),
StructField("CoachCity", StringType, true))), true),
StructField("Suffix", StringType, true))), true),
StructField("GoalHistory", ArrayType(
StructType(
Seq(
StructField("MatchDate", StringType, true),
StructField("NumberofGoals", StringType, true),
StructField("SubstitutionIndicator", StringType, true))), true), true),
StructField("receive_date", DateType, true))
), true
)))
val requiredFields = List("playerId", "playerName", "Rating", "CoachName", "CoachCity", "MatchDate", "NumberofGoals")
val schema: StructType = StructType(schema1 ++ schema2)
变量schema是当前 schema,requiredFields包含我们需要的新 schema 的字段。我们还需要新模式中的父块。 输出模式应如下所示:
val outputSchema =
Seq(
StructField("playerId", StringType, true),
StructField("playerName", StringType, true),
StructField("PlayerHistory",
ArrayType(StructType(
StructField("Rating", StringType, true),
StructField("CoachDetails",
StructType(
StructField("CoachName", StringType, true),
StructField("Address", StructType(
StructField("CoachCity", StringType, true)), true),
StructField("GoalHistory", ArrayType(
StructType(
StructField("MatchDate", StringType, true),
StructField("NumberofGoals", StringType, true)), true), true)))
我尝试使用以下代码以递归方式解决问题。
schema.fields.map(f => filterSchema(f, requiredFields)).filter(_.name != "")
def filterSchema(field: StructField, requiredColumns: Seq[String]): StructField = {
field match{
case StructField(_, inner : StructType, _ ,_) => StructField(field.name,StructType(inner.fields.map(f => filterSchema(f, requiredColumns))))
case StructField(_, ArrayType(structType: StructType, _),_,_) =>
if(requiredColumns.contains(field.name))
StructField(field.name, ArrayType(StructType(structType.fields.map(f => filterSchema(f,requiredColumns))),true), true)
else
StructField("",StringType,true)
case StructField(_, _, _, _) => if(requiredColumns.contains(field.name)) field else StructField("",StringType,true)
}
}
但是,我无法过滤掉内部结构字段。
感觉可以对递归函数的基本条件进行一些修改。这里的任何帮助将不胜感激。提前致谢。
解决方案
这是我的做法,
class SchemaRefiner(schema: StructType, requiredColumns: Seq[String]) {
var FINALSCHEMA: Array[StructField] = Array[StructField]()
private def refine(schematoRefine: StructType, requiredColumns: Seq[String]): Unit = {
schematoRefine.foreach(f => {
if (requiredColumns.contains(f.name)) {
f match {
case StructField(_, inner: StructType, _, _) =>
FINALSCHEMA = FINALSCHEMA :+ f
case StructField(_, inner: StructType, _, _) =>
FINALSCHEMA = FINALSCHEMA :+ StructField(f.name, StructType(new SchemaRefiner(inner, requiredColumns).getRefinedSchema), true)
case StructField(_, ArrayType(structType: StructType, _), _, _) =>
FINALSCHEMA = FINALSCHEMA :+ StructField(f.name, ArrayType(StructType(new SchemaRefiner(structType, requiredColumns).getRefinedSchema)), true)
case StructField(_, _, , _, _) =>
FINALSCHEMA = FINALSCHEMA :+ f
}
}
})
}
def getRefinedSchema: Array[StructField] = {
refine(schema, requiredColumns)
this.FINALSCHEMA
}
}
这将遍历 structfields,每次遇到新的 Structtype 时,都会递归调用该函数以获取新的 Structype。
val fields = new SchemaRefiner(schema,requiredFields)
val newSchema = fields.getRefinedSchema
推荐阅读
- visual-studio-code - VSCode 运行错误的预启动任务
- javascript - 导出到视频时向 HTML5 画布添加声音不起作用
- r - 控制单个中断/标签 Facet Grid / ggplot2
- oracle - 为什么我对 current_timestamp 的第一次调用比第二次调用产生的时间晚?
- python - 从python中的文本中提取日期
- api - 如何获取书籍热门笔记和亮点[API]?
- python - 如何找到两个 n*m 数组的相似列?
- java - 在同一个端口使用两个本地 ip
- jquery - Bootstrap 4 Dropdown 克隆事件处理不正确
- html - 打字稿文件中的DOMDocument