scala - 列表的 Spark/Scala 序列化。任务不可序列化:java.io.NotSerializableException
问题描述
问题在于 Spark 数据集和 Ints 列表的序列化。Scala 版本是 2.10.4,Spark 版本是 1.6。
这与其他问题类似,但我无法根据这些回复使其工作。我已经简化了代码以显示问题。
我有一个案例类:
case class FlightExt(callsign: Option[String], serials: List[Int])
我的主要方法是这样的:
val (ctx, sctx) = SparkUtil.createContext() // just a helper function to build context
val flightsDataFrame = separateFlightsMock(sctx) // reads data from Parquet file
import sctx.implicits._
flightsDataFrame.as[FlightExt]
.map(flight => flight.callsign)
.show()
我收到以下错误:
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
- object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package scala)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)
- object (class scala.reflect.internal.Types$UniqueThisType, scala.type)
- field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type)
- object (class scala.reflect.internal.Types$TypeRef$$anon$6, scala.Int)
- field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, name: elementType$2, type: class scala.reflect.api.Types$TypeApi)
- object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType))
- field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;)))
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;))))
- field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class scala.collection.mutable.WrappedArray$,ObjectType(interface scala.collection.Seq),make,invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;)),true))
- writeObject data (class: scala.collection.immutable.$colon$colon)
如果我从 FlightExt 中删除列表,那么一切正常,这表明 lambda 函数序列化没有问题。
Scala 本身似乎可以序列化 Int 的列表。也许 Spark 在序列化列表方面存在问题?
我也尝试过使用 Java Integer。
编辑:
如果我将 List 更改为 Array 它可以工作,但如果我有这样的东西:
case class FlightExt(callsign: Option[String], other: Array[AnotherCaseClass])
它也因相同的错误而失败
我是 Scala 和 Spark 的新手,可能会遗漏一些东西,但任何解释都将不胜感激。
解决方案
把FlightExt
类放在里面object
,检查下面的代码。
object Flight {
case class FlightExt(callsign: Option[String], var serials: List[Int])
}
利用Flight.FlightExt
val (ctx, sctx) = SparkUtil.createContext() // just a helper function to build context
val flightsDataFrame = separateFlightsMock(sctx) // reads data from Parquet file
import sctx.implicits._
flightsDataFrame.as[Flight.FlightExt]
.map(flight => flight.callsign)
.show()
推荐阅读
- python-3.x - 如何在 Pyspark 函数“Withcolumn”中传递列表
- python - 在 Visual Studio Code 中运行 Python 代码有困难,在 Python shell 中运行,但不在此处
- xml - 如何通过将 xslt 导入另一个 xslt 来获取输入值?
- macos - 使用 docker 在 Mac 和 nginx 中安装 Nginx
- python - 是否可以检查计算机是否使用 python 连接到域?
- python - 康威的生命游戏推进网格(与上一个不同!)
- javascript - 可以设置网站的加载时间吗?
- installation - Silverstripe 4.6 setup .env - vars,缺少 install.php,语言?
- r - 如何在 R 中为纵向设计正确设置 lmer?
- c# - 仅当其他三个 MVVM 对象已启用时才启用 MVVM 对象