首页 > 解决方案 > 如何将数据帧传递给 spark udf?

问题描述

我想定义一个udf。在函数体中,它将从外部数据框中搜索数据。我怎样才能做到这一点?我试图将数据框传递给 udf。但它不能工作。

示例代码:

val countryDF = spark.read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("Country.csv")

val geo = (originString: String, dataFrame: DataFrame) => {
  // Search data from countryDF
  val row = dataFrame.where(col("CountryName") === originString)
  if (row != Nil){
    // set data to row index 2
    row.getAs[String](2)
  }
  else{
    "0"
  }
}
val udfGeo = udf(geo)

val cLatitudeAndLongitude = udfGeo(countryTestDF.col("CountryName"), lit(countryDF))

countryTestDF = countryTestDF.withColumn("Latitude", cLatitudeAndLongitude)

标签: apache-sparkdataframeuser-defined-functions

解决方案


如果要使用 UDF,则必须在列上工作,而不是在数据框对象上工作。您必须创建一个新列来获取 UDF 的输出。

def geo(originString : String, CountryName: String) : Int = {

    if (CountryName == originString){
      return 1}
    else{
      return 0}
  }

val geoUDF = udf(geo _)

val newData = countryDF.withColum("isOrignOrNot", geoUDF(col("originString"),col("CountryName"))

推荐阅读