首页 > 解决方案 > Spark java.lang.NullPointerException 在foreach迭代器内部过滤spark数据帧时出错

问题描述

我有两个 spark df,我想为一个 df 执行 foreach 迭代器,并从下一个 df 中获取特定的 someId 相关记录。

当我每次发生 java.lang.NullPointerException 时都这样做时,

我已经在 foreach 循环中发布了带有注释的代码。我尝试了 3 种方法来做到这一点,但每次都发生同样的错误。

请帮我解决这个问题。

val schListDf = spark.read.format("csv")
.option("header", "true")
.load("/home/user/projects/scheduled.csv")

schListDf.createOrReplaceTempView("预定")

 val trsListDf = spark.read.format("csv")
.option("header", "true")
.load("/home/user/projects/transaction.csv")

trsListDf.createOrReplaceTempView("事务")

//THIS WORK FINE

val df3 = spark.sql("select * from transaction limit 5").show()

schListDf.foreach(row => {
if(row(2) != null){

  // I HAVE TRIED THIS WAY FIRST, BUT OCCURRED SAME ERROR
  val df = spark.sql("select * from transaction where  someid = '"+row(2)+"'")

  // I HAVE TRIED THIS WAY SECOND(WITHOUT someID filter), BUT OCCURRED SAME ERROR
  val df2 = spark.sql("select * from transaction limit 5")

  // I HAVE TRIED THIS WAY ALSO(FILTER WITH DF), BUT OCCURRED SAME ERROR
  val filteredDataListDf = trsListDf.filter($"someid" === row(2))
}

})

2002 年 18 月 12 日 10:36:34 错误执行程序:在 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) 阶段 4.0 (TID 4) java.lang.NullPointerException 中的任务 0.0 异常在 org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) 在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) 在 controllers.FileProcess$$anonfun$hnbFile$1.apply (FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:891) at scala.collection.AbstractIterator。 foreach(Ite​​rator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$ $anonfun$foreach$1$$anonfun$apply$28。在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext. scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor .Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 错误执行程序:在 org.apache.spark 阶段 4.0 (TID 7) java.lang.NullPointerException 中的任务 3.0 异常。 sql.SparkSession。sessionState$lzycompute(SparkSession.scala:142) 在 org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) 在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) 在控制器.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Ite​​rator.scala :891) at scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala :2074) 在 org.apache.spark。SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task. scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor $Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 错误执行器:阶段 4.0 (TID 5) 中任务 1.0 中的异常在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread 的 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 运行(Executor.scala:345) .run(Thread.java:748) 18/12/02 10:36:34 错误执行程序:阶段 4.0 (TID 5) 中的任务 1.0 异常在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread 的 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 运行(Executor.scala:345) .run(Thread.java:748) 18/12/02 10:36:34 错误执行程序:阶段 4.0 (TID 5) 中的任务 1.0 异常

java.lang.NullPointerException 在 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) 在 org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) 在 org.apache。 spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala: 48) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:891) at scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1 $$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache .spark.SparkContext$$anonfun$runJob$5。在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 处申请(SparkContext.scala:2074)在 org.apache.spark.scheduler.Task.run(Task.scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 18/12/02 10:36: 34 错误执行程序:在 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) 在 org.apache.spark.sql 的阶段 4.0 (TID 6) 中的任务 2.0 中的异常 java.lang.NullPointerException。火花会话。sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess $$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:891) at scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334) at org. apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28 .apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext .scala:2074) 在 org.apache.spark.scheduler.ResultTask。runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java .util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 18 /12/02 10:36:34 WARN TaskSetManager:在 4.0 阶段丢失任务 2.0(TID 6,本地主机,执行程序驱动程序):org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession. scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$ hnb文件$1。apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:891) at scala.collection.AbstractIterator .foreach(Ite​​rator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD $$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark .SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task .scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java :748)

标签: javascalaapache-spark

解决方案


一些 Spark 方面与驱动程序相关。

无法从 foreach 中访问 DF,这意味着 Executor 端。

这就是范式。同样适用于 RDD 和 Spark Session。

也就是说,foreach 可以,但是用val DF 或者spark.sql 不行。例如,您将需要一个 while 循环。

当人们开始使用 Spark 时,这是一个常见的误解。


推荐阅读