首页 > 解决方案 > 如何将 df 列作为参数传递给函数?

问题描述

我写了下面的函数

object AgeClassification {
  def AgeCategory(age:Int) : String = {
    if(age<=30)
      return "Young" 
    else if(age>=65)
      return "Older" 
    else
      return "Mid-age"
  }
}

我正在尝试将数据框列作为参数传递

val df_new = df
  .withColumn("Age_Category", AgeClassification.AgeCategory(df("age")))

但得到错误

:33: 错误:类型不匹配;
找到: org.apache.spark.sql.Column
required: Int
val df_new = df.withColumn("Age_Category",AgeClassification.AgeCategory(df("age")))

如何将列作为参数传递?

val df_new = df
  .withColumn("Age_Category",AgeClassification.AgeCategory(df.age.cast(IntegerType)))   

:33: 错误:值 age 不是 org.apache.spark.sql.DataFrame
val df_new = df.withColumn("Age_Category",AgeClassification.AgeCategory(df.age.cast(IntegerType))) 的成员

val df_new = df
   .withColumn("Age_Category", AgeClassification.AgeCategory(df("age").cast(Int)))

:33:错误:使用替代方法重载方法值转换:(
至:String)org.apache.spark.sql.Column
(至:org.apache.spark.sql.types.DataType)org.apache.spark.sql.Column
不能应用于 (Int.type)
val df_new = df.withColumn("Age_Category",AgeClassification.AgeCategory(df("age").cast(Int)))

标签: scalaapache-spark

解决方案


使用 SparkSQL API 操作数据帧时,不能直接使用 scala 函数。您只能使用在Column类中或在functions类中定义的“列”函数。他们基本上将列转换为列。实际计算在 Spark 中处理。

为了说明这一点,您可以在 REPL 中尝试:

scala> df("COL1").cast("int")
res6: org.apache.spark.sql.Column = CAST(COL1 AS INT)

类型是Column,不是int,这就是为什么 scala 拒绝在这样的对象上应用你的函数(它是一个整数)。

要使用自定义函数,您需要将其包装在 UDF 中,如下所示:

val ageUDF = udf((age : Int) => AgeClassification.AgeCategory(age))
// or shorter
val ageUDF = udf(AgeClassification.AgeCategory _)

// The you may use it this way:
df.withColumn("classif", ageUDF(df("age")))

另请注意,df.age它在 pyspark 中有效,但在 ni scala 中无效。对于按名称访问列的简短方法,您可以 importspark.implicits._和 write$"age"甚至更短的'age.


推荐阅读