scala - 将 udf 函数与 Spark DataFrame 一起使用时出现异常
问题描述
在 Spark 版本:2.4.0 中,我试图在给定的 DataFrame 上执行以下代码:UnfoldedDF:org.apache.spark.sql.DataFrame
movieid:integer
words:array -- element:string
tokens:string
val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()
创建的新数据框是 tokensWithDf:org.apache.spark.sql.DataFrame
tokens:string
df:long
在其上完成以下操作。
def findIdf(x : Long) : Double = {scala.math.log10((42306).toDouble/x)}
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()
它失败,但有以下异常:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
解决方案
推荐阅读
- r - 了解自定义就地修改功能的代码?
- javascript - 控制器未更新 AngularJS 视图
- sql - Oracle 最小值和最大值(相同的值)
- jenkins - 使用声明性 Jenkinsfile 在多分支管道上运行夜间作业
- rx-java - 如何从 rx.Observable 异步获取整数列表?
- php - Laravel Auth 检查组织
- hibernate - Grails 3.3.6 没有配置 GORM?
- powerbi - 在避免歧义的同时创建关系
- javascript - Gulp 使用 gulp-rev 处理文件数组
- google-app-engine - 在 REST API 中公开访问 Google ML 引擎