首页 > 解决方案 > Apache Spark 中的异常处理

问题描述

我一直在研究在 Apache Spark 作业中处理异常的正确方法。我已经阅读了 Stackoverflow 中的不同问题,但我仍然没有得出结论。从我的角度来看,有三种处理异常的方法:

  1. 尝试在将要执行计算的 lambda 函数周围捕获/阻塞。这很棘手,因为必须将块放置在触发惰性计算的代码周围。如果发生错误,那么我认为不会有任何 RDD 可以使用(取自博客条目)

     val lines: RDD[String] = sc.textFile("large_file.txt")
     val tokens = 
     lines.flatMap(_ split " ")
        .map(s => s(10))
     try {
        // This try-catch block catch all the exceptions thrown by the 
        // preceding transformations. 
        tokens.saveAsTextFile("/some/output/file.txt")
     } catch {
       case e : StringIndexOutOfBoundsException => 
       // Doing something in response of the exception
     }
    
  2. 在 lambda 函数内尝试 catch/block:这意味着在 lambda 函数内确定捕获的异常的正确输出。

    rdd.map({
     Try(fn) match{
         case Success: _
         case Failure:<<Record with error flag>>
     }).filter(record.errorflag==null)
    
  3. 让异常传播。该任务将失败,Spark 框架将再次重新启动该任务。当错误是由代码范围之外的原因引起时,此方法有效。例如(内存泄漏,与另一个服务的连接暂时丢失。)

处理异常的正确方法是什么?我想这取决于你想通过 RDD 操作实现什么。如果 RDD 记录之一中的错误意味着输出无效,那么选项 1 就是要走的路。如果我们预计某些记录会失败,我们会选择选项 2。选项 3 甚至不需要做出选择,因为它是平台的正常行为。

标签: scalaapache-sparkerror-handling

解决方案


过去,除了输入参数检查外,我们并没有使用 try/catch 方法。

其余的我们只是依靠检查返回码,如下所示:

spark-submit --master yarn ... bla bla
ret_val=$?  
...
 

为什么?因为您需要纠正一些一般性的问题,我们需要重新开始。很难动态地纠正某些事情。您的日程安排工具也可以处理这个问题,Rundeck、Airflow...等。

更高级的重启选项是可能的,但只是变得复杂,但可以完成。正如您在选项 2 中提到的那样。但从未见过这样做过。


推荐阅读