首页 > 解决方案 > 是否可以等到 EMR 集群终止?

问题描述

我正在尝试编写一个组件来启动 EMR 集群,在该集群上运行 Spark 管道,然后在管道完成后关闭该集群。

我已经创建了集群并设置了权限以允许我的主集群的工作机器启动 EMR 集群。但是,我正在努力调试创建的集群并等待管道结束。这是我现在拥有的代码。注意我使用的是 Spark Scala,但这非常接近标准 Java 代码:

val runSparkJob = new StepConfig()
  .withName("Run Pipeline")
  .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
  .withHadoopJarStep(
    new HadoopJarStepConfig()
      .withJar("/path/to/jar")
      .withArgs(
        "spark-submit",
        "etc..."
      )
  )

// Create a cluster and run the Spark job on it
val clusterName = "REDACTED Cluster"
val createClusterRequest =
  new RunJobFlowRequest()
    .withName(clusterName)
    .withReleaseLabel(Configs.EMR_RELEASE_LABEL)
    .withSteps(enableDebugging, runSparkJob)
    .withApplications(new Application().withName("Spark"))
    .withLogUri(Configs.LOG_URI_PREFIX)
    .withServiceRole(Configs.SERVICE_ROLE)
    .withJobFlowRole(Configs.JOB_FLOW_ROLE)
    .withInstances(
      new JobFlowInstancesConfig()
        .withEc2SubnetId(Configs.SUBNET)
        .withInstanceCount(Configs.INSTANCE_COUNT)
        .withKeepJobFlowAliveWhenNoSteps(false)
        .withMasterInstanceType(Configs.MASTER_INSTANCE_TYPE)
        .withSlaveInstanceType(Configs.SLAVE_INSTANCE_TYPE)
    )

val newCluster = emr.runJobFlow(createClusterRequest)

我有两个具体问题:

  1. emr.runJobFlow提交结果后立即返回调用。有什么方法可以让它阻塞直到集群关闭或等到工作流程结束?

  2. 我的集群实际上没有出现,当我转到AWS Console -> EMR -> Events视图时,我看到了一个失败:

    Amazon EMR Cluster j-XXX (REDACTED...) has terminated with errors at 2019-06-13 19:50 UTC with a reason of VALIDATION_ERROR.

有什么方法可以在我的 Java/Scala 应用程序中以编程方式解决这个错误?

标签: scalaamazon-web-servicesapache-sparkamazon-emraws-step-config

解决方案


是的,很有可能等到 EMR 集群终止。

有些服务员会阻止执行,直到集群(即作业流)达到某个状态。

val newCluster = emr.runJobFlow(createClusterRequest);
val describeRequest = new DescribeClusterRequest()
    .withClusterId(newCluster.getClusterId())

// Wait until terminated
emr.waiters().clusterTerminated().run(new WaiterParameters(describeRequest))

另外,如果要获取集群的状态(即工作流),可以调用EMR客户端的describeCluster函数。查看链接文档,因为您可以获得有关集群的状态和状态信息,以确定它是成功还是错误。

val result = emr.describeCluster(describeRequest)

注意:不是最好的Java-er,所以以上是我最好的猜测以及它是如何根据文档工作的,但我还没有测试过上面的内容。


推荐阅读