apache-spark - Spark 创建新的 Spark 会话/上下文并从失败中恢复
问题描述
我工作的 Spark 平台不稳定,每次都以各种原因失败。这份工作并没有在 Hadoop 管理器上死掉,而是在 Running 中徘徊,所以我想杀了它。
在同一个 python 脚本中,一旦出现故障,我想终止当前的 spark 会话,创建另一个 sparkcontext/session 并从最后一个检查点开始。我确实有频繁的检查点以避免 DAG 变得太长。它倾向于失败的部分是一个while循环,所以我可以负担得起当前的df
知道如何实现吗?
我最初的想法是
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("test_Terminal").config("spark.sql.broadcastTimeout", "36000").getOrCreate()
flag_finish = False
flag_fail=False
while (!flag_finish) :
if flag_fail : #kill current erroneous session
sc.stop()
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '60g'),
('spark.driver.memory','30g'),('spark.executor.cores', '16'),
('spark.driver.cores', '24'),('spark.cores.max', '32')])
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
df = ...#read back from checkpoint or disk
#process with current df or df picked up
while .. :#this is where server tend to fail my job due after some time
try :
##df processing and update
...
df.checkpoint()
df.count() #activate checkpoint
if complete :
flag_finished = True
exception Exception as e:
flag_fail=True
continue
另一个问题是如何从检查点显式读取(由 df.checkpoint() 完成)
解决方案
非流式处理中的检查点是使用服务器血统。它不是为在不同应用程序或不同 Spark 上下文之间共享数据而设计的。
你想要的实际上是不可能的。
推荐阅读
- c# - 检查两组点是否与源点位于不同的半球
- polymer - 聚合物 3 创建元素运行时并在插槽内注入 html
- javascript - Javascript数组排序并制定与日期范围相同的时间表
- google-maps - Google Places api 给我错误 Daily Quota Exceeded
- php - 在 MySQL 中插入 NULL,如果字符串在 PHP 中爆炸后为空
- java - LDAP 服务器前端 Java 库
- java - AbstractArgumentFactory.build 的 JDBI3 逆向
- javascript - 如何在反应中加载 GIF 文件?
- html - 在(以前)动画列表上没有使用 MacBooks 触控板滚动
- ios - Pod 需要 Swift 版本?