apache-spark - java.lang.RuntimeException:org.apache.spark.SparkException:在 solr.DefaultSource.createRelation 中任务不可序列化
问题描述
我已经看过很多这样的关于序列化错误的帖子。但我对此并不陌生。
有一个 dataframe-modProductsData 和一个 map L2L3Map Map。我想用 map-L2L3Map 的值替换列 PRIMARY_CATEGORY 中的值。
val L2L3Map = L2.collect.map(row => (row.get(0).toString, row.get(1).toString)).toMap
val L2L3MapUDF = udf { s: String => L2L3Map.get(s) }
val productsData = spark.read.format("solr").options(readFromSourceClusterOpts).load
var modProductsData = productsData.withColumn("Prime_L2_s", when(col("PRIMARY_CATEGORY").isNotNull, when(col("PRIMARY_CATEGORY").isin(L3ids:_*), L2L3MapUDF(col("PRIMARY_CATEGORY"))).otherwise(when(col("PRIMARY_CATEGORY").isin(L2ids:_*),col("PRIMARY_CATEGORY")).otherwise(lit(null)))).otherwise(lit(null)))
以下是更多错误日志:
java.lang.RuntimeException: org.apache.spark.SparkException: Task not serializable
at solr.DefaultSource.createRelation(DefaultSource.scala:31)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 89 elided
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371)
at
解决方案
It worked with below code :)
def mapPhantom(flagMap: Map[String, String]): (String) => String = {
(id: String) =>
{
flagMap.getOrElse(id,null)
}
}
val L2L3Map = L2.collect.map(row => (row.get(0).toString, row.get(1).toString)).toMap
val L2L3UDF = udf(mapPhantom(L2L3Map))
var modProductsData = productsData.withColumn("Prime_L2_s", when(col("PRIMARY_CATEGORY").isNotNull, when(col("PRIMARY_CATEGORY").isin(L3ids:_*), L2L3UDF(col("PRIMARY_CATEGORY"))).otherwise(when(col("PRIMARY_CATEGORY").isin(L2ids:_*),col("PRIMARY_CATEGORY")).otherwise(lit(null)))).otherwise(lit(null)))
推荐阅读
- vue.js - 如何在 Nuxt Axios API 调用中创建自动相对基本 URL(用于未知部署域)
- javascript - 将值从 JSON 对象推送到数组并使用外部函数
- arrays - 数组中的 Golang 指针
- git - 找不到命令“git.clone”
- algorithm - 随机数生成器,为每个种子产生相同的顺序,最大值不高于循环长度,索引或值没有紧密形式
- java - 欧特克伪造java异常
- python - 如何创建一个二维整数数组?
- javascript - 我正在尝试下载使用 React 作为 FileStream 或 Byte Array 返回的文件
- python - 多索引行和列
- javascript - 为什么 IntelliSense 无法将我的查询选定项识别为元素?