scala - 为什么这个 Spark 代码在本地模式下工作,而不在集群模式下工作?
问题描述
所以,我有这样的事情。请注意,baseTrait
这里的 (a trait) 是可序列化的,因此thisClass
(Object 类) 也应该是可序列化的。
object thisClass extends baseTrait {
private var someVar = null
def someFunc: RDD[...] {
...
// assigned some string value or an empty string value (not null anymore)
someVar = ...
...
if (someVar != "")
someRDD.filter(x => aFunc(x, someVar))
else
...
}
在集群模式下,当我调用someFunc
函数(这是一个静态方法,因为thisClass
它是一个 Object 类)时,我得到一个空指针异常,我认为这与someVar
未正确序列化有关。因为当我这样做时,它可以在集群模式下完美运行。
if (someVar != "") {
val someVar_ = someVar
someRDD.filter(x => aFunc(x, someVar_))
}
知道原始代码中出了什么问题,什么时候thisClass
可以序列化?
我的猜测是,在另一个类中使用可序列化类的变量很好,但是如果您尝试在该类中执行此操作,您可能会遇到问题,因为在这种情况下,您将让运行时尝试序列化同一个类从哪里调用闭包。你怎么看?
解决方案
在这种情况下,您没有遇到序列化问题。
基本上,在集群模式下发生的事情thisClass.someFunc
是永远不会在远程执行器的 JVM 中实际执行。在 executor 上,thisClass
被实例化,并被someVar
赋值null
。然后,当thisClass
对象处于该状态时,spark 框架会直接在该执行程序的数据分区中可用的记录上执行您的 lambda 函数。
避免这种情况的一种方法是将分配移动到对象someVar
的主体中thisClass
。这样做将在someVar
实例化对象时立即分配值。请记住,此代码将在集群中的每个执行程序上执行。
如果这不可能,另一种选择是将您映射RDD[T]
到RDD[(T, String)]
,其中字符串someVar
用于每条记录,然后您的过滤器可能类似于.filter(x => aFunc(x._1, x._2))
. 此方法将使用更多内存,因为您将拥有someVar
' 值的许多副本。
推荐阅读
- telegram - Telegrap API(不是 bot api),列出聊天中的所有服务消息
- angular - Angular:将json获取响应转换为具有函数的类
- perl - Perl - 嵌套数组作为哈希值
- javascript - JavaScript 注入 - 操作对象
- android - 获取用户正在与之交互的 ListView 位置
- java - Java中的直接ByteBuffer是否类似于C中的寄存器?
- scala - Spark无法使用查询作为表名将数据库表读取到带有jdbc的数据帧
- web-services - Web 服务子域托管与虚拟目录
- c# - 在抽象方法中使用继承的接口
- python - Flask-Sqlalchemy Missing BEGIN 似乎导致会话不同步