首页 > 解决方案 > 列表的 Spark/Scala 序列化。任务不可序列化:java.io.NotSerializableException

问题描述

问题在于 Spark 数据集和 Ints 列表的序列化。Scala 版本是 2.10.4,Spark 版本是 1.6。

这与其他问题类似,但我无法根据这些回复使其工作。我已经简化了代码以显示问题。

我有一个案例类:

case class FlightExt(callsign: Option[String], serials: List[Int])

我的主要方法是这样的:

    val (ctx, sctx) = SparkUtil.createContext() // just a helper function to build context
    val flightsDataFrame = separateFlightsMock(sctx) // reads data from Parquet file

    import sctx.implicits._
    flightsDataFrame.as[FlightExt]
      .map(flight => flight.callsign)
      .show()

我收到以下错误:

SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package scala)
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)
    - object (class scala.reflect.internal.Types$UniqueThisType, scala.type)
    - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type)
    - object (class scala.reflect.internal.Types$TypeRef$$anon$6, scala.Int)
    - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, name: elementType$2, type: class scala.reflect.api.Types$TypeApi)
    - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType))
    - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
    - object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;)))
    - writeObject data (class: scala.collection.immutable.$colon$colon)
    - object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;))))
    - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq)
    - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class scala.collection.mutable.WrappedArray$,ObjectType(interface scala.collection.Seq),make,invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;)),true))
    - writeObject data (class: scala.collection.immutable.$colon$colon)

如果我从 FlightExt 中删除列表,那么一切正常,这表明 lambda 函数序列化没有问题。

Scala 本身似乎可以序列化 Int 的列表。也许 Spark 在序列化列表方面存在问题?

我也尝试过使用 Java Integer。

编辑:

如果我将 List 更改为 Array 它可以工作,但如果我有这样的东西:

case class FlightExt(callsign: Option[String], other: Array[AnotherCaseClass])

它也因相同的错误而失败

我是 Scala 和 Spark 的新手,可能会遗漏一些东西,但任何解释都将不胜感激。

标签: scalaapache-sparkserialization

解决方案


FlightExt类放在里面object,检查下面的代码。

object Flight {
 case class FlightExt(callsign: Option[String], var serials: List[Int])
}

利用Flight.FlightExt

val (ctx, sctx) = SparkUtil.createContext() // just a helper function to build context
    val flightsDataFrame = separateFlightsMock(sctx) // reads data from Parquet file

    import sctx.implicits._
    flightsDataFrame.as[Flight.FlightExt]
      .map(flight => flight.callsign)
      .show()


推荐阅读