首页 > 解决方案 > Spark 创建新的 Spark 会话/上下文并从失败中恢复

问题描述

我工作的 Spark 平台不稳定,每次都以各种原因失败。这份工作并没有在 Hadoop 管理器上死掉,而是在 Running 中徘徊,所以我想杀了它。

在同一个 python 脚本中,一旦出现故障,我想终止当前的 spark 会话,创建另一个 sparkcontext/session 并从最后一个检查点开始。我确实有频繁的检查点以避免 DAG 变得太长。它倾向于失败的部分是一个while循环,所以我可以负担得起当前的df

知道如何实现吗?

我最初的想法是

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("test_Terminal").config("spark.sql.broadcastTimeout", "36000").getOrCreate()

flag_finish = False
flag_fail=False
while (!flag_finish) :
   if flag_fail : #kill current erroneous session 
      sc.stop()
      conf = pyspark.SparkConf().setAll([('spark.executor.memory', '60g'), 
      ('spark.driver.memory','30g'),('spark.executor.cores', '16'), 
      ('spark.driver.cores', '24'),('spark.cores.max', '32')])
      sc = pyspark.SparkContext(conf=conf)
      spark = SparkSession(sc)
      df = ...#read back from checkpoint or disk

   #process with current df or df picked up
   while .. :#this is where server tend to fail my job due after some time
       try :
          ##df processing and update
          ...
          df.checkpoint()
          df.count() #activate checkpoint 

          if complete :
              flag_finished = True
       exception Exception as e:
          flag_fail=True
          continue

另一个问题是如何从检查点显式读取(由 df.checkpoint() 完成)

标签: apache-sparksessionexceptionpyspark

解决方案


非流式处理中的检查点是使用服务器血统。它不是为在不同应用程序或不同 Spark 上下文之间共享数据而设计的。

你想要的实际上是不可能的。


推荐阅读