scala - DataFrame使用UDF给Task不可序列化异常
问题描述
尝试在数据帧上使用 show() 方法。它给了任务不可序列化的异常。
我试图扩展 Serializable 对象,但错误仍然存在。
object App extends Serializable{
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache").setLevel(Level.WARN);
val spark = SparkSession.builder()
.appName("LearningSpark")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val inputPath = "./src/resources/2015-03-01-0.json"
val ghLog = spark.read.json(inputPath)
val pushes = ghLog.filter("type = 'PushEvent'")
val grouped = pushes.groupBy("actor.login").count
val ordered = grouped.orderBy(grouped("count").desc)
ordered.show(5)
val empPath = "./src/resources/ghEmployees.txt"
val employees = Set() ++ (
for {
line <- fromFile(empPath).getLines
} yield line.trim)
val bcEmployees = sc.broadcast(employees)
import spark.implicits._
val isEmp = user => bcEmployees.value.contains(user)
val isEmployee = spark.udf.register("SetContainsUdf", isEmp)
val filtered = ordered.filter(isEmployee($"login"))
filtered.show()
}
}
使用 Spark 的默认 log4j 配置文件:
org/apache/spark/log4j-defaults.properties
19/09/01 10:21:48 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:850)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
at App$.main(App.scala:33)
at App.main(App.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
- object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2364/2031154005, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2364/2031154005@1fd37440)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF:SetContainsUdf(actor#6.login))
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@3b65084e)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(isnotnull(type#13), (type#13 = PushEvent), UDF:SetContainsUdf(actor#6.login)))
- field (class: org.apache.spark.sql.execution.FileSourceScanExec, name: dataFilters, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.FileSourceScanExec, FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
- field (class: org.apache.spark.sql.execution.FilterExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
- object (class org.apache.spark.sql.execution.FilterExec, Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
+- FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
- field (class: org.apache.spark.sql.execution.ProjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
- object (class org.apache.spark.sql.execution.ProjectExec, Project [actor#6]
+- Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
+- FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
- field (class: org.apache.spark.sql.execution.aggregate.HashAggregateExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
- object (class org.apache.spark.sql.execution.aggregate.HashAggregateExec, HashAggregate(keys=[actor#6.login AS actor#6.login#53], functions=[partial_count(1)], output=[actor#6.login#53, count#43L])
+- Project [actor#6]
+- Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
+- FileScan json [actor#6,type#13] Batched:+------------------+-----+
| login|count|
+------------------+-----+
| greatfirebot| 192|
|diversify-exp-user| 146|
| KenanSulayman| 72|
| manuelrp07| 45|
| mirror-updates| 42|
+------------------+-----+
only showing top 5 rows
false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 14)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1297/815648243, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1297/815648243@27438750)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 48 more
解决方案
我使用 Scala“2.12.1”获得了 spark 2.4.4。我遇到了同样的问题(对象不可序列化(类:scala.runtime.LazyRef,值:LazyRef thunk)),这让我发疯。我将 Scala 版本更改为“2.12.10”,现在问题已解决!
推荐阅读
- excel - VBA调试,对象不支持该属性
- sql - 如何使用输出子句从连接表中捕获列?
- javascript - Elm 在加载时调用外部 Javascript
- ios - 滚动表格视图时,表格视图中缺少选择
- java - Camera2 API:保持自动旋转,但不旋转相机预览和按钮
- javascript - HTML JS JQ 如何从 2 个文本文件中拖放内容
- ruby-on-rails - 当匹配列在两个表之外时,如何找到记录?
- dart - Flutter:根据排序的值列表对小部件进行排序
- adobe - ArgumentError:错误 #2108:找不到场景场景 4。[adobe 动画 cc 2018]
- reactjs - 无法在 github 页面上使用 npm run 在 React 中部署