首页 > 解决方案 > DataFrame 上的 NullPointerException

问题描述

我有以下用 Scala 编写的方法:

def fillEmptyCells: Unit = {
    val hourIndex = _weather.schema.fieldIndex("Hour")
    val dateIndex = _weather.schema.fieldIndex("Date")
    val groundSurfaceIndex = _weather.schema.fieldIndex("GroundSurface")
    val snowyGroundIndex = _weather.schema.fieldIndex("SnowyGroundSurface")
    val precipitationIndex = _weather.schema.fieldIndex("catPrec")
    val snowDepthIndex = _weather.schema.fieldIndex("catSnowDepth")

    var resultDf : DataFrame = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row],_weather.schema)

    val days = _weather.select("Date").distinct().rdd
    _weather.where("Date = '2014-08-01'").show()
    days.foreach(x => {
      println(s"Date = '${x.getDate(0)}'")
      _weather.where(s"Date = '${x.getDate(0)}'").show()
      val day = _weather.where(s"Date = '${x.getDate(0)}'")
      val dayValues = day.where("Hour = 6").first()
      val grSur = dayValues.getString(groundSurfaceIndex)
      val snSur = dayValues.getString(snowyGroundIndex)
      val prec = dayValues.getString(precipitationIndex)
      val snowDepth = dayValues.getString(snowDepthIndex)
      val dayRddMapped = day.rdd.map(y => (y(0), y(1), grSur, snSur, y(4), y(5), y(6), y(7), prec, snowDepth))
        .foreach(z => {
          resultDf = resultDf.union(Seq(z).toDF())
        })
    })
    resultDf.show(20)
    Unit
  }

问题是这一行:_weather.where(s"Date = '${x.getDate(0)}'").show()发生的地方NullPointerException。如上一行所示,我将 where 子句打印到控制台(看起来像Date = '2014-06-03'),并且 foreach 之前的行将输出之一作为参数并且工作正常。_weather是一个类变量,并且在此方法运行时不会更改。调试器显示了更多奇怪的东西:在第一次迭代后被编辑_weathernull

这种魔法的来源是什么,我该如何避免它?

此外,如果您对架构和代码质量有任何建议,欢迎在这里

堆栈跟踪:

java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.where(Dataset.scala:1344)
    at org.[package].WeatherHelper$$anonfun$fillEmptyCells$1.apply(WeatherHelper.scala:148)
    at org.[package].WeatherHelper$$anonfun$fillEmptyCells$1.apply(WeatherHelper.scala:146)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
19/01/10 13:39:35 ERROR Executor: Exception in task 6.0 in stage 10.0 (TID 420)

类名WeatherHelper只是整个堆栈跟踪的一部分,重复约 20 次。

标签: scalaapache-spark

解决方案


您不能在 RDD 代码中使用数据帧(您在 中使用数据帧days.foreach),这里的数据帧为空,因为它只存在于驱动程序上,而不存在于执行程序上


推荐阅读