首页 > 解决方案 > 广播值中的 NullPointer 作为参数传递

问题描述

:)

我想说我是 Spark 的新手,因为很多这些帖子都开始了..但事实是我并不是那么新。尽管如此,我仍然面临广播变量的这个问题。

当一个变量被广播时,每个执行器都会收到它的一个副本。稍后,当在执行器中执行的代码部分(比如说map或foreach)中引用该变量时,如果未将驱动程序中设置的变量引用传递给它,则执行器不知道是什么我们在谈论吗?我认为在这里可以完美解释

我的问题是我得到了一个 nullPointerException ,即使我将广播引用传递给了执行者。

class A {
    var broadcastVal: Broadcast[Dataframe] = _
    ...

    def method1 {
        broadcastVal = otherMethodWhichSendBroadcast
        doSomething(broadcastVal, others)
    }
}

class B {
    def doSomething(...) {
        forEachPartition {x => doSomethingElse(x, broadcasVal)}
    }
}

object C {
    def doSomethingElse(...) {
        broadcastVal.value.show --> Exception
    }
}

我错过了什么?

提前致谢!

标签: apache-sparkapache-spark-sql

解决方案


RDD 和 DataFrames 已经是分布式结构,不需要将它们作为局部变量广播。(org.apache.spark.sql.functions.broadcast()函数(在进行连接时使用)不是局部变量广播)


即使您明智地尝试代码语法,它也不会显示任何编译错误,而是会抛出100 RuntimeException%有效的错误。NullPointerException

解释行为的示例:

package examples

import org.apache.log4j.Level
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}

object BroadCastCheck extends App {
  org.apache.log4j.Logger.getLogger("org").setLevel(Level.OFF)
  val spark = SparkSession.builder().appName(getClass.getName).master("local").getOrCreate()
  val sc = spark.sparkContext
  val df = spark.range(100).toDF()
  var broadcastVal: Broadcast[DataFrame] = sc.broadcast(df)


  val t1 = sc.parallelize(0 until 10)
  val t2 = sc.broadcast(2) // this is right since its local variable can be primitive or map or any scala collection
  val t3 = t1.filter(_ % t2.value == 0).persist() //this is the way of ha
  t3.foreach {
    x =>
      println(x)
      // broadcastVal.value.toDF().show // null pointer  wrong way
    //   spark.range(100).toDF().show // null pointer  wrong way

  }
}

结果:(如果您取消评论broadcastVal.value.toDF().showspark.range(100).toDF().show在上面的代码中)

Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:56)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics$lzycompute(WholeStageCodegenExec.scala:528)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics(WholeStageCodegenExec.scala:527)

在这里进一步阅读广播变量和广播函数之间的区别......


推荐阅读