java - 使用 sparksession.sql 导致 Task not serializable 错误
问题描述
我正在尝试执行以下代码:-
SparkSession sparkSession = SparkSession
.builder()
.appName("test")
// .master("local")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> df = sparkSession.sql("## My SQL query to HIVE here ##").toDF();
Dataset<Row> rulesDataset = sparkSession.sql("select * from rules.md");
String code = rulesDataset.collectAsList().get(0).get(0).toString();
df.show(false);
Script script=new GroovyShell().parse(code);
UDF3 rulesExecutingUDF = new UDF3<Double,String,String, String>() {
@Override
public String call(Double val1,String val2,String val3) throws Exception {
Binding binding = new Binding();
binding.setVariable("VAL1",val1);
binding.setVariable("VAL2", val2);
binding.setVariable("VAL3", val3);
script.setBinding(binding);
Object value = script.run();
return value.toString();
}
};
sparkSession.udf().register("rulesExecutingUDF",rulesExecutingUDF, DataTypes.StringType);
df=df.withColumn("NEW_COL",callUDF("rulesExecutingUDF",col("VAL1"),col("VAL2"),col("VAL3")));
df.show();
问题是我在这里遇到序列化错误,说任务不可序列化。我做了很多试验和错误,发现语句
Dataset<Row> df = sparkSession.sql("## My SQL query to HIVE here ##").toDF();
可能与此有关。我从服务器的配置单元表中获取此数据集。
我所做的是我准备了一个具有类似模式的类似数据集,而不是那个查询,如果我使用硬编码变量,比如
StructField[] structFields = new StructField[]{
new StructField("VAL1", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("VAL2", DataTypes.StringType, true, Metadata.empty()),
new StructField("VAL3", DataTypes.StringType, true, Metadata.empty())
};
StructType structType = new StructType(structFields);
List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create(160.0,"X","I"));
rows.add(RowFactory.create(200.0,"D","C"));
Dataset<Row> df = sparkSession.createDataFrame(rows, structType);
然后我没有收到可序列化的错误并且火花作业成功执行。
两种方式创建的数据集的架构是相同的,而且我从配置单元表中获得的值也是相同的。我无法找到为什么会这样。有人可以帮我吗?
调试器消息
诊断:用户类抛出异常:org.apache.spark.SparkException:在 org.apache.spark.util.ClosureCleaner$ 的 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345) 任务不可序列化。 org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean( SparkContext.scala:2304) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply( RDD.scala:849) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache .spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608) at org.apache.spark .sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org .apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache。 spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 在 org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) 在 org.apache.spark.sql.execution.SparkPlan。 getByteArrayRdd(SparkPlan.scala:247) 在 org.apache.spark.sql.execution。SparkPlan.executeTake(SparkPlan.scala:337) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$ Dataset$$collectFromPlan(Dataset.scala:3278) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$ head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId( SQLExecution.scala:77) 在 org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) 在 org.apache.spark.sql.Dataset.head(Dataset.scala:2489) 在 org.apache.spark .sql.Dataset.take(Dataset.scala:2703) 在 org.apache.spark.sql.Dataset.showString(Dataset.scala:254) 在 org.apache.spark.sql。com.test2.FeefactMD.main(FeefactMD.java:65) 处的 Dataset.show(Dataset.scala:725) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 处 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4 .run(ApplicationMaster.scala:721) 引起:java.io.NotSerializableException:Script1 序列化堆栈:-对象不可序列化(类:Script1,值:Script1@121b5eab)-数组元素(索引:0)-数组(类[Ljava.lang.Object;,大小 1) - 字段(类:java.lang.invoke.SerializedLambda,名称:cappedArgs,类型:类 [Ljava.lang.Object;) - 对象(java.lang.invoke.SerializedLambda 类,SerializedLambda[capturingClass=com.test2.FeefactMD 类,functionalInterfaceMethod=org/apache/spark/sql/api/java/UDF3.call:(Ljava/lang/Object;Ljava /lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/test2/FeefactMD.lambda$main$c068ede9$1:(Lgroovy/lang/Script;Ljava/lang/Double;Ljava/ lang/String;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/Double;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;, numCaptured=1 ]) - writeReplace 数据(类:java.lang.invoke.SerializedLambda) - 对象(类 com.test2.FeefactMD$$Lambda$61/1143680308, com.test2.FeefactMD$$Lambda$61/1143680308@1fe9c374) - 字段(类:org.apache.spark.sql.UDFRegistration$$anonfun$30,名称:f$20,类型:接口org.apache.spark.sql.api.java。UDF3) - 对象(类 org.apache.spark.sql.UDFRegistration$$anonfun$30, ) - 数组元素(索引:5) - 数组(类 [Ljava.lang.Object;,大小 6) - 字段(类: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10,名称:references$1,类型:类 [Ljava.lang.Object;) - 对象(类 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun $10, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer .JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)UDFRegistration$$anonfun$30, ) - 数组元素(索引:5) - 数组(类 [Ljava.lang.Object;,大小 6) - 字段(类:org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun $10,名称:references$1,类型:类 [Ljava.lang.Object;) - org.apache.spark.serializer.SerializationDebugger$ 中的对象(类 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, ) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 的 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 的 .improveException(SerializationDebugger.scala:40)。 apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)UDFRegistration$$anonfun$30, ) - 数组元素(索引:5) - 数组(类 [Ljava.lang.Object;,大小 6) - 字段(类:org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun $10,名称:references$1,类型:类 [Ljava.lang.Object;) - org.apache.spark.serializer.SerializationDebugger$ 中的对象(类 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, ) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 的 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 的 .improveException(SerializationDebugger.scala:40)。 apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10,名称:references$1,类型:类 [Ljava.lang.Object;) - 对象(类 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun $10, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer .JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10,名称:references$1,类型:类 [Ljava.lang.Object;) - 对象(类 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun $10, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer .JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
解决方案
推荐阅读
- javascript - .map() 只有在有数组时才能使用?
- swift - 关于 RxSwift/RxCocoa 与 combineLatest 绑定的问题
- reactjs - 图像未在 React js 中显示。当我想显示图像时,它没有显示。没有这个一切都好
- c++ - 如何使用模板遮罩遮罩多个对象
- c++ - C++ ostream 是否位于行首?
- mysql - 日期与 SQL 标准中的数字如何比较
- chronicle - 保存 ChronicleQueue 的消费者/尾部读取偏移量
- javascript - 如何从firebase存储(Javascript,React)中的下载链接获取文件名?
- python - 在python中按行对二维数组进行排序
- python - s[:] 和 s 之间的差异,引用和副本之间的差异