首页 > 解决方案 > Spark Scala获取空指针异常

问题描述

我正在尝试从 tiff 图像中获取大量高程数据,我有一个 csv 文件。csv 文件内容还包括纬度、经度和其他属性。循环遍历csv文件,获取经纬度,调用海拔方法,代码如下。参考RasterFrames 提取位置信息问题

    package main.scala.sample

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import org.locationtech.rasterframes._
    import org.locationtech.rasterframes.datasource.raster._
    import org.locationtech.rasterframes.encoders.CatalystSerializer._
    import geotrellis.raster._
    import geotrellis.vector.Extent
    import org.locationtech.jts.geom.Point
    import org.apache.spark.sql.functions.col

    object SparkSQLExample {

        def main(args: Array[String]) {

            implicit val spark = SparkSession.builder()
            .master("local[*]").appName("RasterFrames")
            .withKryoSerialization.getOrCreate().withRasterFrames
            spark.sparkContext.setLogLevel("ERROR")


            import spark.implicits._

            val example = "https://raw.githubusercontent.com/locationtech/rasterframes/develop/core/src/test/resources/LC08_B7_Memphis_COG.tiff"
            val rf = spark.read.raster.from(example).load()

            val rf_value_at_point = udf((extentEnc: Row, tile: Tile, point: Point) => {
              val extent = extentEnc.to[Extent]
              Raster(tile, extent).getDoubleValueAtPoint(point)
            })

            val spark_file:SparkSession = SparkSession.builder()
            .master("local[1]")
            .appName("SparkByExamples")
            .getOrCreate()

            spark_file.sparkContext.setLogLevel("ERROR")

            println("spark read csv files from a directory into RDD")
            val rddFromFile = spark_file.sparkContext.textFile("point.csv")
            println(rddFromFile.getClass)

            def customF(str: String): String = {
                val lat = str.split('|')(2).toDouble;
                val long = str.split('|')(3).toDouble;
                val point = st_makePoint(long, lat)
                val test = rf.where(st_intersects(rf_geometry(col("proj_raster")), point))
        .select(rf_value_at_point(rf_extent(col("proj_raster")), rf_tile(col("proj_raster")), point) as "value")
                return test.toString()
            }
            val rdd2=rddFromFile.map(f=> customF(f))
            rdd2.foreach(t=>println(t))
            spark.stop()

      }
    }

当我运行得到空指针异常时,任何帮助表示赞赏

java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:64)
    at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3416)
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1490)
    at org.apache.spark.sql.Dataset.where(Dataset.scala:1518)
    at main.scala.sample.SparkSQLExample$.main$scala$sample$SparkSQLExample$$customF$1(SparkSQLExample.scala:49)

标签: scalaapache-sparkapache-spark-sqlrasterframes

解决方案


The function which is being mapped over the RDD (customF) is not null safe. Try calling customF(null) and see what happens. If it throws an exception, then you will have to make sure that rddFromFile doesn't contain any null/missing values.

It is a little hard to tell if that is exactly where issue is. I think the stack trace of the exception is less helpful than usual because the function is being run in a spark tasks on the workers.

If that is the issue, you could rewrite customF to handle the case where str is null or change the parameter type to Option[String] (and tweak the logic accordingly).

By the way, the same thing allies for UDFs. They need to either

  1. Accept Option types as input
  2. Handle the case where each arg is null or
  3. Only be applied to data with no missing values.

推荐阅读