首页 > 解决方案 > Spark - 将整行传递给 udf,然后在 udf 中获取列名

问题描述

我正在将 Spark 与 Scala 一起使用,并希望将整行传递给 udf 并选择 udf 侧的每个列名和列值。我怎样才能做到这一点?

我正在尝试关注-

inputDataDF.withColumn("errorField", mapCategory(ruleForNullValidation) (col(_*)))

def mapCategory(categories: Map[String, Boolean]) = {
  udf((input:Row) =>  //write a recursive function to check if each row is in categories if yes check for null if null then false, repeat this for all columns and then combine results)   
})

标签: scalaapache-spark

解决方案


在 Spark 1.6 中,您可以Row用作外部类型和struct 表达式。作为表达。列名可以从模式中获取。例如:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, struct}

val df = Seq((1, 2, 3)).toDF("a", "b", "c")
val f = udf((row: Row) => row.schema.fieldNames)
df.select(f(struct(df.columns map col: _*))).show

// +-----------------------------------------------------------------------------+
// |UDF(named_struct(NamePlaceholder, a, NamePlaceholder, b, NamePlaceholder, c))|
// +-----------------------------------------------------------------------------+
// |                                                                    [a, b, c]|
// +-----------------------------------------------------------------------------+

可以使用Row.getAs方法按名称访问值。


推荐阅读