首页 > 解决方案 > UDF 在本地工作,但在执行程序上失败

问题描述

我有以下功能:

  def timestampConverter(dt: String): Option[Int] = {
    val timeStamp = Option(dt).getOrElse(return None)
    return Some(timeStamp.replace("-", "").substring(0, 8).toInt)
  }

我将其注册为 UDF 函数:

spark.udf.register("timestampToKey", timestampConverter(_:String))

然后我在我的 SQL 查询中使用它:

sql("select timestampToKey(dateti) FROM table").show

当我在--master local上运行它时它工作正常

当我在cluster (--master yarn)上运行它时出现问题。然后它失败并出现以下错误:

    spark-submit --class "com.test.job.Driver" --master yarn test-job_2.11.jar

18/06/26 03:20:19 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
 at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
 at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1070)
18/06/26 03:20:52 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, host1, executor 1): java.lang.ExceptionInInitializerError
 at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
 at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
 at org.apache.spark.scheduler.Task.run(Task.scala:100)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
 at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
 at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)
 at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
 at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
 at scala.Option.getOrElse(Option.scala:121)
 at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
 at com.test.job.SparkSessionWrapper$class.sparkHive(SparkSessionWrapper.scala:14)
 at com.test.job.Handler$.sparkHive$lzycompute(Handler.scala:6)
 at com.test.job.Handler$.sparkHive(Handler.scala:6)
 at com.test.job.Handler$.<init>(Handler.scala:10)
 at com.test.job.Handler$.<clinit>(Handler.scala)
 ... 18 more

18/06/26 03:20:54 WARN TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2, host2, executor 1): java.lang.NoClassDefFoundError: Could not initialize class com.test.job.Handler$
 at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
 at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
 at org.apache.spark.scheduler.Task.run(Task.scala:100)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

18/06/26 03:20:54 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, host3, executor 2): java.lang.NoClassDefFoundError: Could not initialize class com.test.job.Handler$
 at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
 at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
 at org.apache.spark.scheduler.Task.run(Task.scala:100)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1436)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1424)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1423)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
 at scala.Option.foreach(Option.scala:257)
 at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1651)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1606)
 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1595)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
 at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
 at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
 at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2773)
 at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
 at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
 at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
 at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
 at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2803)
 at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
 at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
 at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
 at at com.test.job.Driver$.main(Driver.scala:8)
 at at com.test.job.Driver.main(Driver.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:733)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:177)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:202)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:116)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class at com.test.job.Handler$
 at at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
 at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
 at org.apache.spark.scheduler.Task.run(Task.scala:100)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

知道会有什么问题吗?请注意,只有当我使用 UDF 运行 SQL 查询时它才会失败,没有 UDF> 可以正常工作。

好的,我稍微缩小了问题的范围,但仍然不明白为什么它会失败。这是我的申请

//SparkSessionWrapper.scala
package com.test.job

import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {
  lazy val sparkHive: SparkSession = {
    SparkSession
      .builder()
      .appName("test")
      .enableHiveSupport()
      .getOrCreate()
  }
}

//UdfFunctions.scala
package com.test.job

trait UdfFunctions {

  def timestampConverter(dt: String): Option[Int] = {
    val timeStamp = Option(dt).getOrElse(return None)
    return Some(timeStamp.replace("-", "").substring(0, 8).toInt)
  }
}

//Handler.scala
package com.test.job

import org.apache.spark.sql.DataFrame
import  scala.io.Source

object Handler extends SparkSessionWrapper with UdfFunctions{
  import sparkHive.sql
  sparkHive.udf.register("timestampToKey", timestampConverter(_:String))
  def executeSqlQuery(SqlString: String): DataFrame = {
    return sparkHive.sql(SqlString)
  }
}

//Driver.scala
package com.test.job

object Driver {
  def main(args: Array[String]) = {
    Handler.executeSqlQuery("select timestampToKey(dt) FROM table").show
  }
}

但是,如果我将 UDF 寄存器sparkHive.udf.register("timestampToKey", timestampConverter(_:String))放在 executeSqlQuery 中,那么它就可以工作。我想知道为什么A master URL must be set in your configuration当我在外部方法执行 udf.register 时会收到此错误。

标签: apache-sparkapache-spark-sql

解决方案


推荐阅读