首页 > 解决方案 > 使用 API 从 Play Framework 控制器终止/停止/终止 Spark 作业的更好方法?

问题描述

我确实有一个名为的控制器SparkJobController

class SparkJobController @Inject()(cc: ControllerComponents, actorSystem: ActorSystem)(implicit exec: ExecutionContext)
  extends AbstractController(cc) {
  val jobsMap = scala.collection.mutable.Map.empty[String, SparkAppHandle]

  /* POST request that takes ID to pass to the spark-submit (to be) jar */
  def run: Action[AnyContent] = Action.async { request =>
    request.body.asJson.map { json =>
      Json.fromJson[String](json).asOpt match {
        case Some(id) =>
          val job = new SparkLauncher()
            .setSparkHome("/usr/local/spark")
            .setMaster("local[*]")
            .setAppName("spark-app")
            .setAppResource("/usr/abc/spark.jar")
            .setMainClass("example.job.MainClass")
            .addAppArgs(id)

          val jobHandle = job.startApplication()

          jobsMap += (appId -> jobHandle)
          Future.successful(Ok(Json.toJson(appId)))
        case None =>
          Future.successful(BadRequest)
      }
    }.getOrElse(Future.successful(BadRequest))
  }

  /* POST request to kill a job taking ID returned by run API */
  def stop: Action[AnyContent] = Action.async { request =>
    request.body.asJson.map { json =>
      Json.fromJson[String](json).asOpt match {
        case Some(appId) =>
          jobsMap.get(appId).map { job =>
            job.kill()
            Future.successful(Ok(Json.toJson(s"Successfully stopped application with appId = $appId.")))
          }.getOrElse(Future.successful(NotFound(Json.toJson("Couldn't find in queue."))))
        case None =>
          Future.successful(BadRequest)
      }
    }.getOrElse(Future.successful(BadRequest))
  }
}

我想将jobsMap数据库或缓存中的某个地方(Redis可能)持久化。我怎样才能做到这一点?或者如果没有,我应该怎么做才能制作一个从 API(运行)获取作业请求并提供 API(停止)来停止/杀死/终止正在运行的作业的排队系统。

最优雅的方法是什么?

供参考,

标签: scalaapache-sparkplayframework

解决方案


推荐阅读