java - 如何在 Java / Kotlin 中创建返回复杂类型的 Spark UDF?
问题描述
我正在尝试编写一个返回复杂类型的 UDF:
private val toPrice = UDF1<String, Map<String, String>> { s ->
val elements = s.split(" ")
mapOf("value" to elements[0], "currency" to elements[1])
}
val type = DataTypes.createStructType(listOf(
DataTypes.createStructField("value", DataTypes.StringType, false),
DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)
但任何时候我使用这个:
df = df.withColumn("price", callUDF("toPrice", col("price")))
我得到一个神秘的错误:
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct<value:string,currency:string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: {value=138.0, currency=USD} (of class java.util.LinkedHashMap)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
... 19 more
我尝试使用自定义数据类型:
class Price(val value: Double, val currency: String) : Serializable
使用返回该类型的 UDF:
private val toPrice = UDF1<String, Price> { s ->
val elements = s.split(" ")
Price(elements[0].toDouble(), elements[1])
}
但后来我得到另一个MatchError
抱怨这种Price
类型的东西。
如何正确编写可以返回复杂类型的 UDF?
解决方案
TL;DR该函数应该返回一个类的对象org.apache.spark.sql.Row
。
Spark 提供了两种主要的UDF
定义变体。
udf
使用 Scala 反射的变体:def udf[RT](f: () ⇒ RT)(implicit arg0: TypeTag[RT]): UserDefinedFunction
def udf[RT, A1](f: (A1) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1]): UserDefinedFunction
- ...
def udf[RT, A1, A2, ..., A10](f: (A1, A2, ..., A10) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1], arg2: TypeTag[A2], ..., arg10: TypeTag[A10])
定义
Scala 闭包 ... 作为用户定义函数 (UDF) 的参数。数据类型是根据 Scala 闭包的签名自动推断的。
这些变体在没有架构的情况下使用原子或代数数据类型。例如,有问题的函数将在 Scala 中定义:
case class Price(value: Double, currency: String) val df = Seq("1 USD").toDF("price") val toPrice = udf((s: String) => scala.util.Try { s split(" ") match { case Array(price, currency) => Price(price.toDouble, currency) } }.toOption) df.select(toPrice($"price")).show // +----------+ // |UDF(price)| // +----------+ // |[1.0, USD]| // +----------+
在这个变体中,返回类型是自动编码的。
由于它依赖于反射,这个变体主要是为 Scala 用户设计的。
udf
提供模式定义的变体(您在此处使用的)。此变体的返回类型应与 for 相同Dataset[Row]
:正如在另一个答案中指出的那样,您只能使用SQL 类型映射表中列出的类型(装箱或未装箱的原子类型,
java.sql.Timestamp
/java.sql.Date
以及高级集合)。复杂结构 (
structs
/StructTypes
) 用 表示org.apache.spark.sql.Row
。不允许与代数数据类型或等效数据混合。例如(Scala 代码)struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>
应表示为
Row(1, Row("foo", Row(-1.0, 42))))
不是
(1, ("foo", (-1.0, 42))))
或任何混合变体,例如
Row(1, Row("foo", (-1.0, 42))))
提供此变体主要是为了确保 Java 互操作性。
在这种情况下(等同于所讨论的),定义应类似于以下定义:
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf import org.apache.spark.sql.Row val schema = StructType(Seq( StructField("value", DoubleType, false), StructField("currency", StringType, false) )) val toPrice = udf((s: String) => scala.util.Try { s split(" ") match { case Array(price, currency) => Row(price.toDouble, currency) } }.getOrElse(null), schema) df.select(toPrice($"price")).show // +----------+ // |UDF(price)| // +----------+ // |[1.0, USD]| // | null| // +----------+
排除异常处理的所有细微差别(通常
UDFs
应该控制null
输入并按照惯例优雅地处理格式错误的数据)Java 等价物应该或多或少像这样:UserDefinedFunction price = udf((String s) -> { String[] split = s.split(" "); return RowFactory.create(Double.parseDouble(split[0]), split[1]); }, DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("value", DataTypes.DoubleType, true), DataTypes.createStructField("currency", DataTypes.StringType, true) }));
上下文:
为了给您一些上下文,这种区别也反映在 API 的其他部分中。例如,您可以DataFrame
从架构和以下序列创建Rows
:
def createDataFrame(rows: List[Row], schema: StructType): DataFrame
或将反射与一系列Products
def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame
但不支持混合变体。
换句话说,您应该提供可以使用编码的输入RowEncoder
。
当然,您通常不会udf
用于这样的任务:
import org.apache.spark.sql.functions._
df.withColumn("price", struct(
split($"price", " ")(0).cast("double").alias("price"),
split($"price", " ")(1).alias("currency")
))
相关:
推荐阅读
- nginx - proxy_pass return redirect in nginx
- c# - 如果区域操作系统是泰语,如何保留日期输入的公历日期格式
- node.js - sequelize transaction 中的 setShooter 是什么?
- c - 在 Linux 上创建自定义服务以通过串行 UART 与调制解调器通信
- typescript - 打字稿类型任意数量的通用字段
- reactjs - 如何修复 `...theme.mixins` 意外的 Token SyntaxError?
- testing - 使用 Jest 的 material-ui Dialog 组件测试问题
- angularjs - AngularJS ui-router 如何确定应用程序的根 url?
- javascript - 在任意深度更新 JS 对象
- c++ - ctypes,添加静态库时未定义的符号