首页 > 解决方案 > Scala / Spark:如何定义可序列化的案例类(非REPL)?

问题描述

Scala新手在这里。我在 Zeppelin 笔记本上写了一份 Spark 作业的草稿。我使用了 Datasets api,因此为了让我的案例类在执行时正确序列化,我在单独的笔记本单元格中ds.map(s => MyCaseClass(...))定义了我的案例类。否则,它就行不通了。

现在我正在编写一个实际的工作来在 Apache Airflow 中运行它。主文件如下所示:

class MainObj {
   private val spark = SparkSession.builder()...getOrCreate()       
   import spark.implicits._

   case class MyCaseClass(...)

   def run() {       
      spark.read
      ...
      .map(s => MyCaseClass(...))
      ...
   }
}

object MainObj {
   def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
      new MainObj(arguments, sparkConf)

   def main(args: Array[String]): Unit = {
      MainObj(...).run()
   }
}

在这种情况下,我得到:

如果无法访问定义此类的范围,则无法为内部类 MainObj$MyCaseClass 生成编码器。

如果我添加org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)inside 或 before run(),我会得到:

引起:java.io.NotSerializableException:MainObj序列化堆栈:对象不可序列化(类:MainObj,值:MainObj@2f11d889)

我还尝试将案例类移动到单独的文件(不起作用)或内部run()(甚至不编译)。

对这个问题感到非常沮丧......任何人都可以帮忙,或者至少给我指出一个解释case classes,spark.implicits和之间关系的地方scopes吗?

标签: scalaapache-spark

解决方案


您需要在根级别定义案例类,而不是在类/对象中。

您可以尝试以下结构吗?

case class MyCaseClass(...)

class MainObj {
   private val spark = SparkSession.builder()...getOrCreate()       
   import spark.implicits._

   def run() {       
      spark.read
      ...
      .map(s => MyCaseClass(...))
      ...
   }
}

object MainObj {
   def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
      new MainObj(arguments, sparkConf)

   def main(args: Array[String]): Unit = {
      MainObj(...).run()
   }
}

推荐阅读