scala - Scala / Spark:如何定义可序列化的案例类(非REPL)?
问题描述
Scala新手在这里。我在 Zeppelin 笔记本上写了一份 Spark 作业的草稿。我使用了 Datasets api,因此为了让我的案例类在执行时正确序列化,我在单独的笔记本单元格中ds.map(s => MyCaseClass(...))
定义了我的案例类。否则,它就行不通了。
现在我正在编写一个实际的工作来在 Apache Airflow 中运行它。主文件如下所示:
class MainObj {
private val spark = SparkSession.builder()...getOrCreate()
import spark.implicits._
case class MyCaseClass(...)
def run() {
spark.read
...
.map(s => MyCaseClass(...))
...
}
}
object MainObj {
def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) =
new MainObj(arguments, sparkConf)
def main(args: Array[String]): Unit = {
MainObj(...).run()
}
}
在这种情况下,我得到:
如果无法访问定义此类的范围,则无法为内部类 MainObj$MyCaseClass 生成编码器。
如果我添加org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
inside 或 before run()
,我会得到:
引起:java.io.NotSerializableException:MainObj序列化堆栈:对象不可序列化(类:MainObj,值:MainObj@2f11d889)
我还尝试将案例类移动到单独的文件(不起作用)或内部run()
(甚至不编译)。
对这个问题感到非常沮丧......任何人都可以帮忙,或者至少给我指出一个解释case classes
,spark.implicits
和之间关系的地方scopes
吗?
解决方案
您需要在根级别定义案例类,而不是在类/对象中。
您可以尝试以下结构吗?
case class MyCaseClass(...)
class MainObj {
private val spark = SparkSession.builder()...getOrCreate()
import spark.implicits._
def run() {
spark.read
...
.map(s => MyCaseClass(...))
...
}
}
object MainObj {
def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) =
new MainObj(arguments, sparkConf)
def main(args: Array[String]): Unit = {
MainObj(...).run()
}
}
推荐阅读
- python - 没有名为 pytesseract 的模块
- dpdk - DPDK - 中断而不是轮询
- if-statement - 如何根据有多少空白单元格填充一个单元格
- python - 如何实现一个可以保留多个小数的层?
- html - 使用纯 CSS 响应移动视图的背景图像
- javascript - 这个 jQuery 的原生 javascript 版本是什么?
- avx - AVX2 置换控制位
- java - Android 聊天在 DataSnapshot.getValue() 上崩溃
- python-3.x - 使用配置文件、string.format() 和 pathlib 路径创建文件路径
- c++ - 抛出异常:写访问冲突。这是 nullptr