scala - Spark 如何在停止 Spark 会话之前跟踪尝试次数?
问题描述
在停止 spark 会话之前,Spark 如何跟踪配置的 spark.yarn.maxAppAttempts?Spark 为所有重试保持相同的 application_id。基于 Git
def stop(): Unit = {
sparkContext.stop()
}
它从 SparkContext 调用以下停止函数
/**
* Shut down the SparkContext.
*/
def stop(): Unit = {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}
if (_shutdownHookRef != null) {
ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
}
Utils.tryLogNonFatalError {
postApplicationEnd()
}
Utils.tryLogNonFatalError {
_ui.foreach(_.stop())
}
if (env != null) {
Utils.tryLogNonFatalError {
env.metricsSystem.report()
}
}
Utils.tryLogNonFatalError {
_cleaner.foreach(_.stop())
}
Utils.tryLogNonFatalError {
_executorAllocationManager.foreach(_.stop())
}
if (_listenerBusStarted) {
Utils.tryLogNonFatalError {
listenerBus.stop()
_listenerBusStarted = false
}
}
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
if (_dagScheduler != null) {
Utils.tryLogNonFatalError {
_dagScheduler.stop()
}
_dagScheduler = null
}
if (env != null && _heartbeatReceiver != null) {
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
}
}
Utils.tryLogNonFatalError {
_progressBar.foreach(_.stop())
}
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
Utils.tryLogNonFatalError {
_env.stop()
}
SparkEnv.set(null)
}
if (_statusStore != null) {
_statusStore.close()
}
// Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
// `SparkContext` is stopped.
localProperties.remove()
// Unset YARN mode system env variable, to allow switching between cluster types.
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}```
Exactly where the number of attempts re tracked before stopping it?
解决方案
推荐阅读
- python - 如何在急切模式下迭代 tf.tensor
- linux - 在 Sagemaker 处理 API 运行的 Docker 容器上构建一个大图(~44GB)
- spring-integration - 有没有办法在子流上调用 Scatter Gather 并保持执行顺序?
- javascript - 当我不知道对象数组的键/值时,最好的方法是什么?
- css - shape-outside css 属性不适用于 react 和 styled 组件
- sql - 由小于固定数字的数字组成的所有表格
- c# - 扫描瓷砖地图以查找有效区域
- python - ImportError : 无法导入名称
从 在 Linux 上 - apache-spark - Pyspark 无法找到 bigquery 数据源
- unit-testing - GitAction vstest.console.exe 在 vswheredarennm/Setup-VSTest@v1 中找不到错误