首页 > 解决方案 > 如何在 spark 中使用输入 Array[(Date, Double)] 注册 udf

问题描述

我必须编写一个参数为 Array[(Date, Double)](collect_list(struct(col1, col2)) 的结果的 udf。

function (c) {
return function () {
var baseDF = c.cu.loadParquetDF(c.sqc, '/path', true).selectExpr('`date`', '`value`', '`svalue`');
var resDF = new (Java.type('org.apache.spark.ml.feature.SQLTransformer'))().setStatement('select date_range_build(valuePairs) from (select collect_list(struct(`date`,`value`)) as valuePairs from __THIS__)').transform(baseDF);
return resDF;
}().limit(200);
}

(1) 当我使用以下定义时:

def DateRangeBuild(dateValuePairs: Seq[(AnyRef, Number)]) = {...}
sql.udf.register("date_range_build", TimeRangeUdfs.DateRangeBuild(_: Seq[(AnyRef, Number)]))

java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 不能转换为 scala.Tuple2

(2) 当我使用以下定义时:

def DateRangeBuild(dateValuePairs: GenericRowWithSchema) = {...}
sql.udf.register("date_range_build", TimeRangeUdfs.DateRangeBuild(_: GenericRowWithSchema))

java.lang.ClassCastException:scala.collection.mutable.WrappedArray$ofRef 不能转换为 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

(3) 当我使用以下定义时:

def DateRangeBuild(dateValuePairs: Seq[GenericRowWithSchema]) = {...}
sql.udf.register("date_range_build", TimeRangeUdfs.DateRangeBuild(_: Seq[GenericRowWithSchema]))

IDEA 提示:没有类型的隐式参数:Nothing,Nothing。

组装并运行后:

未定义的函数:'date_range_build'。此函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数

标签: scalaapache-spark

解决方案


你的输入应该是Seq[Row],然后映射Row(Date,Double)使用getAs[T](...)方法Row


推荐阅读