首页 > 解决方案 > 为什么这个 List[String] 到数据框会在 Spark Scala 中引发 NullPointerException?

问题描述

以下代码片段导致 NullPointerException。我不确定,如果此异常发生在某些行上,或者总是因为数据框很大并且无法确定该行。

def removeUnwantedLetters(str: String): String = {
    str.split("\\W+").filter(word => (word.matches("[a-z]+") && (word.length > 1))).mkString(" ")
}

val myudf = spark.udf.register("learningUDF", (f1: String, f2: String) => {
    if(f1 != null && f2 != null) {
        val preproList = List(removeUnwantedLetters(f2.toLowerCase));

        if(preproList > 0) {
            val key_items = preproList.toDF("Description")
        }
    }

    (1, 1)
})



mydataframe.withColumn("pv", myudf($"f1", $"f2")).show

整个代码很大,很抱歉没有在这里粘贴整个代码,尽我所能尽量减少这里的失败代码。以下是我在实际代码中遇到的例外情况:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 274.0 failed 4 times, most recent failure: Lost task 0.3 in stage 274.0 (TID 23387, 10.62.145.186, executor 2): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string, string, string, string, string, string, string, string, string, string, string) => struct<_1:int,_2:double>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_26$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:254)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at $anonfun$1.apply(<console>:100)
    at $anonfun$1.apply(<console>:82)
    ... 22 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 66 elided
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string, string, string, string, string, string, string, string, string, string, string) => struct<_1:int,_2:double>)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_26$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:254)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  ... 3 more
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:100)
  at $anonfun$1.apply(<console>:82)
  ... 22 more

通过反复试验,我发现这条线val key_items = preproList.toDF("Description")导致了 NPE。因为如果我将它简单地更改为val key_items = preproList,它就可以正常工作。

Can anyone please let me know when would `val key_items = preproList.toDF("Description")` give a `NullPointerException`.

更新

似乎我们无法在 UDF 中创建数据框。因为我尝试val key_items = preproList.toDF("Description")val key_items = List(1,2,3,4).toDF("VL"). 令我惊讶的是,它失败了,同样的例外。

不能在 UDF 中创建临时数据框吗?

更新 2

我正在尝试使用其管道创建一个临时数据框以使用JohnSnowLabs Norvig Spell 校正模型,如下所示:

val nlpPipeline = new Pipeline().setStages(Array(
  new DocumentAssembler().setInputCol("Description").setOutputCol("document"),
  new Tokenizer().setInputCols("document").setOutputCol("tokens"),
  norvigspell.setInputCols("tokens").setOutputCol("Description_corrected"),
  new Finisher().setInputCols("Description_corrected")
))

val dbDF = preproList.toDF("Description")

val spellcorrectedDF = dbDF.transform(dbDF=> nlpPipeline.fit(dbDF).transform(dbDF))

标签: scalaapache-spark

解决方案


排序答案是:不,您不能在 UDF中创建DataFrame(orDataset ) 。UDF 对单个行值进行操作,因此需要返回可以存储在新列中的简单值,将它们视为计算列如果您可以DataFrame在 UDF 内部创建一个,它将只有一行,并且您将创建其中的许多,每行一个 parentDataFrame

现在,从您的代码中很难说出您想要做什么,在某种程度上我看到您正在尝试某种字符清理,将其存储在一个key_items值中(作为 DataFrame)并且从不使用它......最终无论先前的计算如何,都返回一个常量对......您的 UDF 采用 2 个参数而您只使用一个(1, 1)这一事实也让我感到困惑。

我猜你想根据一个给定列的值(你只使用一个)来计算描述,所以像下面这样的东西会给你类似的东西:

def removeUnwantedLetters(str: String): String = {
    str.split("\\W+").filter(word => (word.matches("[a-z]+") && (word.length > 1))).mkString(" ")
}

val myudf = spark.udf.register("learningUDF", (f1: String) => {
    if(f1 != null) {
        removeUnwantedLetters(f2.toLowerCase)
    } else ""
})

// This seems to be the DataFrame you are looking for
val descriptionDF = mydataframe
  .withColumn("Description", myudf($"f2"))
  .select("Description")

与前面的一样,Spark 可以通过Description在 DataFrame 的所有值上调用 UDF 来创建列。然后,通过使用,.select("Description")您将创建一个仅包含该Description列的新 DataFrame。


推荐阅读