scala - Spark - 将整行传递给 udf,然后在 udf 中获取列名
问题描述
我正在将 Spark 与 Scala 一起使用,并希望将整行传递给 udf 并选择 udf 侧的每个列名和列值。我怎样才能做到这一点?
我正在尝试关注-
inputDataDF.withColumn("errorField", mapCategory(ruleForNullValidation) (col(_*)))
def mapCategory(categories: Map[String, Boolean]) = {
udf((input:Row) => //write a recursive function to check if each row is in categories if yes check for null if null then false, repeat this for all columns and then combine results)
})
解决方案
在 Spark 1.6 中,您可以Row
用作外部类型和struct
表达式。作为表达。列名可以从模式中获取。例如:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, struct}
val df = Seq((1, 2, 3)).toDF("a", "b", "c")
val f = udf((row: Row) => row.schema.fieldNames)
df.select(f(struct(df.columns map col: _*))).show
// +-----------------------------------------------------------------------------+
// |UDF(named_struct(NamePlaceholder, a, NamePlaceholder, b, NamePlaceholder, c))|
// +-----------------------------------------------------------------------------+
// | [a, b, c]|
// +-----------------------------------------------------------------------------+
可以使用Row.getAs
方法按名称访问值。
推荐阅读
- php - 两个自定义字段的 WordPress 元查询。
- scala - 在 Spark shell 中创建 DF 和临时表的 Scala 脚本 - 问题
- ios - 折叠导航栏时不需要的快速动画
- regex - 在vim中以**开头的行大写
- amazon-web-services - 使用 AWS 预签名 URL 进行 PUT 的应用程序是什么?
- c# - 如何在C#中计算字符串数组中的字母
- java - 使用 Spring Batch 汇总数据
- java - 如何发送客户端可以使用 HTTPURLConnection 的 inputStream 读取的数据?
- python - While 循环及其条件
- javascript - 如何在反应引导表中使列可单击并避免将其显示为“添加行”中的文本字段?