apache-spark - Mesos 上 Apache Spark 的自定义状态存储提供程序
问题描述
我为 Apache Spark 2.3.0 编写了自定义状态存储和状态存储提供程序,并尝试使用附加参数部署作业:
--conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider
对于运行 Spark 作业,我使用 Marathon 和 Mesos,并且该作业在异常开始后失败:
java.lang.ClassNotFoundException: com.sample.state.CustomStateStoreProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.create(StateStore.scala:213)
at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.stateStoreCustomMetrics(statefulOperators.scala:121)
at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.metrics(statefulOperators.scala:86)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics$lzycompute(statefulOperators.scala:251)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics(statefulOperators.scala:251)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:58)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
这是运行作业的命令:
/spark/bin/spark-submit \
--repositories "http://127.0.0.1:80/sbt-all" \
--packages com.sample:pipelines:0.1.0 \
--class com.sample.TestApplication \
--conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider \
/spark/examples/jars/spark-examples_2.11-2.3.0.jar
这两个类com.sample.TestApplication
都com.sample.state.CustomStateStoreProvider
位于com.sample:pipelines:0.1.0
包中,我已经检查了好几次。如果没有该spark.sql.streaming.stateStore.providerClass
参数,应用程序将启动并运行良好。
我已经尝试使用驱动程序和执行程序的附加类路径并使用--jars
位于 HDFS 或通过 HTTP 的 JAR 的参数来提交作业。
PS:当我尝试在本地开始工作时,我没有任何问题,在这种情况下一切正常。
解决方案
好吧,一般来说,需要将spark.sql.streaming.stateStore.providerClass
参数的值括在引号中:--conf spark.sql.streaming.stateStore.providerClass="com.sample.state.CustomStateStoreProvider"
. 没有它,值后面的空格将包含在值中,Spark 将查找com.sample.state.CustomStateStoreProvider
类(在行尾带有空格符号)并且无法找到它。其他一切都很好:)
推荐阅读
- android - 无论形状如何,如何强制 Widget 占用它周围的可用空间?
- html - 如何使用线性渐变制作异形背景?
- java - 如何从 Android 中的 Worker 类访问我的 Rooms 数据库的存储库?
- c++ - 有条件地定义容器类的方法
- python - 在具有一定距离的外圆内生成随机点
- python-3.x - 如何使用 gpiozero 按钮方法检查某个按钮何时被按下
- linux - 从 proc/sys/kernel/ 读取数据
- google-sheets - Google 表格 - 创建过去 90 天平均收盘价的折线图?
- angular - 按钮路由器链接不起作用
- javascript - 我使用了一个表达式来减少 javascript 中的 for 循环,但它不起作用