首页 > 解决方案 > 为什么这个 Spark 代码在本地模式下工作,而不在集群模式下工作?

问题描述

所以,我有这样的事情。请注意,baseTrait这里的 (a trait) 是可序列化的,因此thisClass(Object 类) 也应该是可序列化的。

object thisClass extends baseTrait {
  private var someVar = null 

  def someFunc: RDD[...] {
    ...
    // assigned some string value or an empty string value (not null anymore)
    someVar = ... 
    ...
    if (someVar != "")
      someRDD.filter(x => aFunc(x, someVar))
    else
      ...
  }

在集群模式下,当我调用someFunc函数(这是一个静态方法,因为thisClass它是一个 Object 类)时,我得到一个空指针异常,我认为这与someVar未正确序列化有关。因为当我这样做时,它可以在集群模式下完美运行。

if (someVar != "") {
  val someVar_ = someVar
  someRDD.filter(x => aFunc(x, someVar_))
}

知道原始代码中出了什么问题,什么时候thisClass可以序列化?

我的猜测是,在另一个类中使用可序列化类的变量很好,但是如果您尝试在该类中执行此操作,您可能会遇到问题,因为在这种情况下,您将让运行时尝试序列化同一个类从哪里调用闭包。你怎么看?

标签: scalaapache-spark

解决方案


在这种情况下,您没有遇到序列化问题。

基本上,在集群模式下发生的事情thisClass.someFunc是永远不会在远程执行器的 JVM 中实际执行。在 executor 上,thisClass被实例化,并被someVar赋值null。然后,当thisClass对象处于该状态时,spark 框架会直接在该执行程序的数据分区中可用的记录上执行您的 lambda 函数。

避免这种情况的一种方法是将分配移动到对象someVar的主体中thisClass。这样做将在someVar实例化对象时立即分配值。请记住,此代码将在集群中的每个执行程序上执行。

如果这不可能,另一种选择是将您映射RDD[T]RDD[(T, String)],其中字符串someVar用于每条记录,然后您的过滤器可能类似于.filter(x => aFunc(x._1, x._2)). 此方法将使用更多内存,因为您将拥有someVar' 值的许多副本。


推荐阅读