scala - 是否可以等到 EMR 集群终止?
问题描述
我正在尝试编写一个组件来启动 EMR 集群,在该集群上运行 Spark 管道,然后在管道完成后关闭该集群。
我已经创建了集群并设置了权限以允许我的主集群的工作机器启动 EMR 集群。但是,我正在努力调试创建的集群并等待管道结束。这是我现在拥有的代码。注意我使用的是 Spark Scala,但这非常接近标准 Java 代码:
val runSparkJob = new StepConfig()
.withName("Run Pipeline")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(
new HadoopJarStepConfig()
.withJar("/path/to/jar")
.withArgs(
"spark-submit",
"etc..."
)
)
// Create a cluster and run the Spark job on it
val clusterName = "REDACTED Cluster"
val createClusterRequest =
new RunJobFlowRequest()
.withName(clusterName)
.withReleaseLabel(Configs.EMR_RELEASE_LABEL)
.withSteps(enableDebugging, runSparkJob)
.withApplications(new Application().withName("Spark"))
.withLogUri(Configs.LOG_URI_PREFIX)
.withServiceRole(Configs.SERVICE_ROLE)
.withJobFlowRole(Configs.JOB_FLOW_ROLE)
.withInstances(
new JobFlowInstancesConfig()
.withEc2SubnetId(Configs.SUBNET)
.withInstanceCount(Configs.INSTANCE_COUNT)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType(Configs.MASTER_INSTANCE_TYPE)
.withSlaveInstanceType(Configs.SLAVE_INSTANCE_TYPE)
)
val newCluster = emr.runJobFlow(createClusterRequest)
我有两个具体问题:
emr.runJobFlow
提交结果后立即返回调用。有什么方法可以让它阻塞直到集群关闭或等到工作流程结束?我的集群实际上没有出现,当我转到
AWS Console -> EMR -> Events
视图时,我看到了一个失败:Amazon EMR Cluster j-XXX (REDACTED...) has terminated with errors at 2019-06-13 19:50 UTC with a reason of VALIDATION_ERROR.
有什么方法可以在我的 Java/Scala 应用程序中以编程方式解决这个错误?
解决方案
是的,很有可能等到 EMR 集群终止。
有些服务员会阻止执行,直到集群(即作业流)达到某个状态。
val newCluster = emr.runJobFlow(createClusterRequest);
val describeRequest = new DescribeClusterRequest()
.withClusterId(newCluster.getClusterId())
// Wait until terminated
emr.waiters().clusterTerminated().run(new WaiterParameters(describeRequest))
另外,如果要获取集群的状态(即工作流),可以调用EMR客户端的describeCluster函数。查看链接文档,因为您可以获得有关集群的状态和状态信息,以确定它是成功还是错误。
val result = emr.describeCluster(describeRequest)
注意:不是最好的Java-er,所以以上是我最好的猜测以及它是如何根据文档工作的,但我还没有测试过上面的内容。
推荐阅读
- c# - 实时过滤心率数据
- dart - 从 HttpClient 请求返回响应正文
- windows - 将文件以五个为一组移动到文件夹中?
- postgresql - 将PostgreSQL中的jsonb转换为无循环的行
- python - tensorflow js中的Keras模型
- syntax - 为什么在 where 子句中存在 trait bound 而在函数签名中却没有?
- c# - 在 asp .net 核心中为 MediatR 库的发送和发布方法添加通用处理程序
- c - 如何以可移植的方式从指向结构成员的指针计算指向结构开头的指针?
- ethereum - 以solidity返回结构数组
- ios - 如何使用 XIB 将自定义 collectionviewCell 连接到重用标识符 - Swift