首页 > 解决方案 > scala中的Spark SQL执行

问题描述

我有一个具有 SQL 查询和视图名称的以下数据(alldata)。

Select_Query|viewname
select v1,v2 from conditions|cond
select w1,w2 from locations|loca

我已拆分并将其正确分配给 temptable(alldata)

val Select_Querydf = spark.sql("select Select_Query,ViewName from alldata")

当我尝试执行查询并从中注册一个临时视图或表时,它显示空指针错误。但是当我注释掉 spark.sql stmt 时,PRINTLN 会显示表中的所有值。

 Select_Querydf.foreach{row => 
          val Selectstmt = row(0).toString()
          val viewname = row(1).toString()
          println(Selectstmt+"-->"+viewname)
      spark.sql(Selectstmt).registerTempTable(viewname)//.createOrReplaceTempView(viewname)
       }
output:
select v1,v2 from conditions-->cond
select w1,w2 from locations-->loca

但是当我用 spark.sql 执行它时,它显示以下错误,请帮助我哪里出错了。

2009 年 19 月 12 日 02:43:12 错误执行程序:阶段 4.0 (TID 4) 中的任务 0.0 中的异常 java.lang.NullPointerException 在 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128)在 org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126) 在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623) 在 sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1.apply (SQLQueryexecutewithheader.scala:36) 在 sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:32) 在 java.lang.Thread.run(Unknown Source) 19/12/09 02:43:12 处运行(Unknown Source) ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 次;线程“主”org.apache.spark.SparkException 中的中止作业异常:作业因阶段故障而中止:阶段 4.0 中的任务 0 失败 1 次,最近一次失败:阶段 4.0 中丢失任务 0.0(TID 4,本地主机,执行程序驱动程序): org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126) 的 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128) 的 java.lang.NullPointerException。 apache.spark.sql.SparkSession.sql(SparkSession.scala:623) 在 sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:36) 在 sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1。

标签: scalaapache-spark

解决方案


此处不能在Dataframe中使用spark.sqlwhich 。在 Driver 中创建,在 worker 中执行,不序列化。SparkSessionforeachSparksessionforeach

我希望你有一个小清单Select_Querydf,如果有的话,你可以收集作为一个清单并如下使用它。

Select_Querydf.collect().foreach { row =>
  val Selectstmt = row.getString(0)
  val viewname = row.getString(1)
  println(Selectstmt + "-->" + viewname)
  spark.sql(Selectstmt).createOrReplaceTempView(viewname)
}

希望这可以帮助!


推荐阅读