scala - 如何在 spark 中使用输入 Array[(Date, Double)] 注册 udf
问题描述
我必须编写一个参数为 Array[(Date, Double)](collect_list(struct(col1, col2)) 的结果的 udf。
function (c) {
return function () {
var baseDF = c.cu.loadParquetDF(c.sqc, '/path', true).selectExpr('`date`', '`value`', '`svalue`');
var resDF = new (Java.type('org.apache.spark.ml.feature.SQLTransformer'))().setStatement('select date_range_build(valuePairs) from (select collect_list(struct(`date`,`value`)) as valuePairs from __THIS__)').transform(baseDF);
return resDF;
}().limit(200);
}
(1) 当我使用以下定义时:
def DateRangeBuild(dateValuePairs: Seq[(AnyRef, Number)]) = {...}
sql.udf.register("date_range_build", TimeRangeUdfs.DateRangeBuild(_: Seq[(AnyRef, Number)]))
java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 不能转换为 scala.Tuple2
(2) 当我使用以下定义时:
def DateRangeBuild(dateValuePairs: GenericRowWithSchema) = {...}
sql.udf.register("date_range_build", TimeRangeUdfs.DateRangeBuild(_: GenericRowWithSchema))
java.lang.ClassCastException:scala.collection.mutable.WrappedArray$ofRef 不能转换为 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
(3) 当我使用以下定义时:
def DateRangeBuild(dateValuePairs: Seq[GenericRowWithSchema]) = {...}
sql.udf.register("date_range_build", TimeRangeUdfs.DateRangeBuild(_: Seq[GenericRowWithSchema]))
IDEA 提示:没有类型的隐式参数:Nothing,Nothing。
组装并运行后:
未定义的函数:'date_range_build'。此函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数
解决方案
你的输入应该是Seq[Row]
,然后映射Row
到(Date,Double)
使用getAs[T](...)
方法Row
推荐阅读
- database - 如何使用 JaguarDB 存储几何对象
- linux - 仅从 Bash 中的 JSON 文件中提取一些键
- kubernetes - 版本 \"v1\" 中的 KubernetesPodOperator Pod 无法作为 Pod 处理:v1.Pod.Spec
- sqoop - Sqoop 导入按列拆分,不同的数据库拆分该数据
- javascript - 在 2 个单独的数组上执行相同的过滤器和映射功能
- c# - 在 ASP.NET Web API 中禁用模型验证 [必需]
- reactjs - 将发送目标与 OnClick 事件一起反应
- string - String.equals(StringBuilder) 和 StringBuilder.equals(String) 混淆
- sql - 基于单个计数值从联接表中排除结果
- csv - 复杂邮件合并(CSV 到 Word、CSV 到 PDF 或其他)