json - org.apache.spark.SparkException:将流写入 blob 存储时任务不可序列化
问题描述
我经历了很多类似的帖子,但我无法理解这里的原因。我有整个代码工作。
我后来只添加了以下代码:
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
def toJson(value: Map[Symbol, Any]): String = {
toJson(value map { case (k,v) => k.name -> v})
}
def toJson(value: Any): String = {
mapper.writeValueAsString(value)
}
def toMap[V](json:String)(implicit m: Manifest[V]): Map[String, Any] = fromJson[Map[String,Any]](json)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
现在,当我在笔记本中执行以下 writestream 单元格时:
data.writeStream
.option("checkpointLocation", _checkpointLocation)
.format("avro")
.partitionBy("Date", "Hour")
.option("path", _containerPath)
.start()
我收到此错误:
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
- object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@660424b3)
- field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
谁能帮我理解这里可能出了什么问题?谢谢!
解决方案
这是罪魁祸首
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
def toJson(value: Map[Symbol, Any]): String = {
toJson(value map { case (k,v) => k.name -> v})
}
def toJson(value: Any): String = {
mapper.writeValueAsString(value)
}
def toMap[V](json:String)(implicit m: Manifest[V]): Map[String, Any] = fromJson[Map[String,Any]](json)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
这意味着您的 json 解析器不可序列化尝试为您的 json 类/对象实现可序列化
尝试使用Gson或
class Jsonparser extends serializable
将是解决这个问题。
看看 task not serialzable 是如何发生在这里- org.apache.spark.SparkException: Task not serializable
推荐阅读
- android - Jetpack Compose - 恢复 LazyColumn 滚动状态
- sitecore - 在内容树中显示文件夹而不是在 url
- c# - 输出数量的理解问题
- gitlab - 在并行阶段内优先考虑一些作业实例化
- java - Apache POI评估AllFormulaCells不起作用
- spring - 使用 Thymeleaf 检查多个 URL 参数
- azure - 从 git 存储库运行 Azure VM 扩展 Powershell 脚本
- c# - 我没有错误,但我无法在数据库中插入值,
- jquery - 如何从多列下拉选择器插件向控制器发送参数值
- html - 在 Jquery 中处理从 REST api 返回的 JSON 列表