apache-spark - UDF 在本地工作,但在执行程序上失败
问题描述
我有以下功能:
def timestampConverter(dt: String): Option[Int] = {
val timeStamp = Option(dt).getOrElse(return None)
return Some(timeStamp.replace("-", "").substring(0, 8).toInt)
}
我将其注册为 UDF 函数:
spark.udf.register("timestampToKey", timestampConverter(_:String))
然后我在我的 SQL 查询中使用它:
sql("select timestampToKey(dateti) FROM table").show
当我在--master local上运行它时它工作正常
当我在cluster (--master yarn)上运行它时出现问题。然后它失败并出现以下错误:
spark-submit --class "com.test.job.Driver" --master yarn test-job_2.11.jar
18/06/26 03:20:19 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1070)
18/06/26 03:20:52 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, host1, executor 1): java.lang.ExceptionInInitializerError
at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
at com.test.job.SparkSessionWrapper$class.sparkHive(SparkSessionWrapper.scala:14)
at com.test.job.Handler$.sparkHive$lzycompute(Handler.scala:6)
at com.test.job.Handler$.sparkHive(Handler.scala:6)
at com.test.job.Handler$.<init>(Handler.scala:10)
at com.test.job.Handler$.<clinit>(Handler.scala)
... 18 more
18/06/26 03:20:54 WARN TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2, host2, executor 1): java.lang.NoClassDefFoundError: Could not initialize class com.test.job.Handler$
at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/06/26 03:20:54 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, host3, executor 2): java.lang.NoClassDefFoundError: Could not initialize class com.test.job.Handler$
at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1436)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1424)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1606)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1595)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2773)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2803)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
at at com.test.job.Driver$.main(Driver.scala:8)
at at com.test.job.Driver.main(Driver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:733)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:177)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:202)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:116)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class at com.test.job.Handler$
at at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at at com.test.job.Handler$$anonfun$1.apply(Handler.scala:10)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
知道会有什么问题吗?请注意,只有当我使用 UDF 运行 SQL 查询时它才会失败,没有 UDF> 可以正常工作。
好的,我稍微缩小了问题的范围,但仍然不明白为什么它会失败。这是我的申请
//SparkSessionWrapper.scala
package com.test.job
import org.apache.spark.sql.SparkSession
trait SparkSessionWrapper {
lazy val sparkHive: SparkSession = {
SparkSession
.builder()
.appName("test")
.enableHiveSupport()
.getOrCreate()
}
}
//UdfFunctions.scala
package com.test.job
trait UdfFunctions {
def timestampConverter(dt: String): Option[Int] = {
val timeStamp = Option(dt).getOrElse(return None)
return Some(timeStamp.replace("-", "").substring(0, 8).toInt)
}
}
//Handler.scala
package com.test.job
import org.apache.spark.sql.DataFrame
import scala.io.Source
object Handler extends SparkSessionWrapper with UdfFunctions{
import sparkHive.sql
sparkHive.udf.register("timestampToKey", timestampConverter(_:String))
def executeSqlQuery(SqlString: String): DataFrame = {
return sparkHive.sql(SqlString)
}
}
//Driver.scala
package com.test.job
object Driver {
def main(args: Array[String]) = {
Handler.executeSqlQuery("select timestampToKey(dt) FROM table").show
}
}
但是,如果我将 UDF 寄存器sparkHive.udf.register("timestampToKey", timestampConverter(_:String))
放在 executeSqlQuery 中,那么它就可以工作。我想知道为什么A master URL must be set in your configuration
当我在外部方法执行 udf.register 时会收到此错误。
解决方案
推荐阅读
- c# - 在 UnitTest 中处理调度程序计时器
- java - Redis如何将值存储为json
- angular - 重新初始化组件时,AGM 是否保留 Google Map 的会话?
- amazon-web-services - 无需更改客户端代码即可轮换 IAM 用户访问密钥
- javascript - 如何更改 Ant Design 卡片组件的标题字体?
- centos - MarkLogic Server 安装 - 服务器安装页面的访问被拒绝 302
- python - 如何修复pytorch'RuntimeError:类型为torch.cuda.LongTensor但发现类型为torch.LongTensor的预期对象'
- sql-server - 从第一个表中获取所有数据,即使连接表中不存在连接条目
- c++ - 如何解决索引和排序问题
- node.js - 如何从节点 js 中的特定路径执行命令?