首页 > 解决方案 > org.apache.spark.SparkException:将流写入 blob 存储时任务不可序列化

问题描述

我经历了很多类似的帖子,但我无法理解这里的原因。我有整个代码工作。

我后来只添加了以下代码:

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
def toJson(value: Map[Symbol, Any]): String = {
    toJson(value map { case (k,v) => k.name -> v})
}
def toJson(value: Any): String = {
    mapper.writeValueAsString(value)
}
def toMap[V](json:String)(implicit m: Manifest[V]): Map[String, Any] = fromJson[Map[String,Any]](json)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
    mapper.readValue[T](json)
}
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

现在,当我在笔记本中执行以下 writestream 单元格时:

data.writeStream
    .option("checkpointLocation", _checkpointLocation)
    .format("avro")
    .partitionBy("Date", "Hour")
    .option("path",  _containerPath)
    .start()

我收到此错误:

    org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
    Caused by: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
    - object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@660424b3)
    - field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)

谁能帮我理解这里可能出了什么问题?谢谢!

标签: jsonscalaapache-sparkserializationjackson

解决方案


这是罪魁祸首

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
def toJson(value: Map[Symbol, Any]): String = {
    toJson(value map { case (k,v) => k.name -> v})
}
def toJson(value: Any): String = {
    mapper.writeValueAsString(value)
}
def toMap[V](json:String)(implicit m: Manifest[V]): Map[String, Any] = fromJson[Map[String,Any]](json)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
    mapper.readValue[T](json)
}
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

这意味着您的 json 解析器不可序列化尝试为您的 json 类/对象实现可序列化

尝试使用Gson

class Jsonparser extends serializable

将是解决这个问题。

看看 task not serialzable 是如何发生在这里- org.apache.spark.SparkException: Task not serializable


推荐阅读