首页 > 解决方案 > Spark 如何在停止 Spark 会话之前跟踪尝试次数?

问题描述

在停止 spark 会话之前,Spark 如何跟踪配置的 spark.yarn.maxAppAttempts?Spark 为所有重试保持相同的 application_id。基于 Git

def stop(): Unit = {
    sparkContext.stop()
  } 

https://github.com/jerryshao/apache-spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#:~:text=def%20stop() %3A%20单位,%7D

它从 SparkContext 调用以下停止函数

/**
   * Shut down the SparkContext.
   */
  def stop(): Unit = {
    if (LiveListenerBus.withinListenerThread.value) {
      throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
    }
    // Use the stopping variable to ensure no contention for the stop scenario.
    // Still track the stopped variable for use elsewhere in the code.
    if (!stopped.compareAndSet(false, true)) {
      logInfo("SparkContext already stopped.")
      return
    }
    if (_shutdownHookRef != null) {
      ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
    }

    Utils.tryLogNonFatalError {
      postApplicationEnd()
    }
    Utils.tryLogNonFatalError {
      _ui.foreach(_.stop())
    }
    if (env != null) {
      Utils.tryLogNonFatalError {
        env.metricsSystem.report()
      }
    }
    Utils.tryLogNonFatalError {
      _cleaner.foreach(_.stop())
    }
    Utils.tryLogNonFatalError {
      _executorAllocationManager.foreach(_.stop())
    }
    if (_listenerBusStarted) {
      Utils.tryLogNonFatalError {
        listenerBus.stop()
        _listenerBusStarted = false
      }
    }
    Utils.tryLogNonFatalError {
      _eventLogger.foreach(_.stop())
    }
    if (_dagScheduler != null) {
      Utils.tryLogNonFatalError {
        _dagScheduler.stop()
      }
      _dagScheduler = null
    }
    if (env != null && _heartbeatReceiver != null) {
      Utils.tryLogNonFatalError {
        env.rpcEnv.stop(_heartbeatReceiver)
      }
    }
    Utils.tryLogNonFatalError {
      _progressBar.foreach(_.stop())
    }
    _taskScheduler = null
    // TODO: Cache.stop()?
    if (_env != null) {
      Utils.tryLogNonFatalError {
        _env.stop()
      }
      SparkEnv.set(null)
    }
    if (_statusStore != null) {
      _statusStore.close()
    }
    // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
    // `SparkContext` is stopped.
    localProperties.remove()
    // Unset YARN mode system env variable, to allow switching between cluster types.
    SparkContext.clearActiveContext()
    logInfo("Successfully stopped SparkContext")
  }```

Exactly where the number of attempts re tracked before stopping it?

标签: scalaapache-spark

解决方案


推荐阅读