scala - 使用 API 从 Play Framework 控制器终止/停止/终止 Spark 作业的更好方法?
问题描述
我确实有一个名为的控制器SparkJobController
class SparkJobController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)(implicit exec: ExecutionContext)
extends AbstractController(cc) {
val jobsMap = scala.collection.mutable.Map.empty[String, SparkAppHandle]
/* POST request that takes ID to pass to the spark-submit (to be) jar */
def run: Action[AnyContent] = Action.async { request =>
request.body.asJson.map { json =>
Json.fromJson[String](json).asOpt match {
case Some(id) =>
val job = new SparkLauncher()
.setSparkHome("/usr/local/spark")
.setMaster("local[*]")
.setAppName("spark-app")
.setAppResource("/usr/abc/spark.jar")
.setMainClass("example.job.MainClass")
.addAppArgs(id)
val jobHandle = job.startApplication()
jobsMap += (appId -> jobHandle)
Future.successful(Ok(Json.toJson(appId)))
case None =>
Future.successful(BadRequest)
}
}.getOrElse(Future.successful(BadRequest))
}
/* POST request to kill a job taking ID returned by run API */
def stop: Action[AnyContent] = Action.async { request =>
request.body.asJson.map { json =>
Json.fromJson[String](json).asOpt match {
case Some(appId) =>
jobsMap.get(appId).map { job =>
job.kill()
Future.successful(Ok(Json.toJson(s"Successfully stopped application with appId = $appId.")))
}.getOrElse(Future.successful(NotFound(Json.toJson("Couldn't find in queue."))))
case None =>
Future.successful(BadRequest)
}
}.getOrElse(Future.successful(BadRequest))
}
}
我想将jobsMap
数据库或缓存中的某个地方(Redis
可能)持久化。我怎样才能做到这一点?或者如果没有,我应该怎么做才能制作一个从 API(运行)获取作业请求并提供 API(停止)来停止/杀死/终止正在运行的作业的排队系统。
最优雅的方法是什么?
供参考,
- 火花版本是
2.3.1
- 播放版本是
2.6.13
- 斯卡拉版本是
2.11.11
解决方案
推荐阅读
- apache-flink - 如何从 flink 中的本地套接字消费,端口号后面有“/*”,其中 * 是源的其余部分?
- sql - 如何修复 Oracle 中 SQL 中的“ORA-01801:日期格式对于内部缓冲区而言太长”错误
- c# - 用于提高状态行性能的 Powershell 作业
- c++ - 关于意外令牌 c2760 的 LinkedList 头文件错误
- html - 如何使用条件渲染从 HTML 音频标签播放音频?
- react-native - 如何在我的 react native 应用程序中首次运行时下载配置和资产?
- google-cloud-platform - Google PubSub - serviceAccount:gmail-api-push@system.gserviceaccount.com 不存在
- python - 数据框中的列似乎不存在
- javascript - 如何将 ajax.reload 与 jQuery 数据表一起使用
- python-3.x - Amazon S3 select_object_content 查询调用