首页 > 解决方案 > 一段时间后停止 Spark 会话 - Pyspark

问题描述

我在 spark 中做 ETL,有时需要很多时间。我想在一段时间后优雅地关闭 spark 会话。

我正在 Pyspark 中编写我的代码。

try:
 df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
 spark.stop()

我想在上面的代码中的某个时间后停止火花。

有没有办法在一段时间后优雅地关闭火花会话?

标签: pythonapache-sparkpysparktimerexit

解决方案


我建议使用官方 python Timer优雅地停止 Spark 会话:

import threading

def timer_elapsed():
    print('Timer elapsed')
    if not sc._jsc.sc().isStopped():
      spark.stop()

# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()

try:
  df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
  print('Spark job finished successfully.')
except Exception as e:
  spark_timer.cancel() # stop timer, we don't need to wait if error occured
  if not sc._jsc.sc().isStopped():
    spark.stop()

注意:如果时间已过或捕获到异常,我们会在两种情况下停止会话。sc._jsc.sc().isStopped在请求停止 Spark 上下文之前,我们检查直接调用 Java API的上下文是否处于活动状态。


推荐阅读