scala - Apache Spark 中的异常处理
问题描述
我一直在研究在 Apache Spark 作业中处理异常的正确方法。我已经阅读了 Stackoverflow 中的不同问题,但我仍然没有得出结论。从我的角度来看,有三种处理异常的方法:
尝试在将要执行计算的 lambda 函数周围捕获/阻塞。这很棘手,因为必须将块放置在触发惰性计算的代码周围。如果发生错误,那么我认为不会有任何 RDD 可以使用(取自此博客条目)
val lines: RDD[String] = sc.textFile("large_file.txt") val tokens = lines.flatMap(_ split " ") .map(s => s(10)) try { // This try-catch block catch all the exceptions thrown by the // preceding transformations. tokens.saveAsTextFile("/some/output/file.txt") } catch { case e : StringIndexOutOfBoundsException => // Doing something in response of the exception }
在 lambda 函数内尝试 catch/block:这意味着在 lambda 函数内确定捕获的异常的正确输出。
rdd.map({ Try(fn) match{ case Success: _ case Failure:<<Record with error flag>> }).filter(record.errorflag==null)
让异常传播。该任务将失败,Spark 框架将再次重新启动该任务。当错误是由代码范围之外的原因引起时,此方法有效。例如(内存泄漏,与另一个服务的连接暂时丢失。)
处理异常的正确方法是什么?我想这取决于你想通过 RDD 操作实现什么。如果 RDD 记录之一中的错误意味着输出无效,那么选项 1 就是要走的路。如果我们预计某些记录会失败,我们会选择选项 2。选项 3 甚至不需要做出选择,因为它是平台的正常行为。
解决方案
过去,除了输入参数检查外,我们并没有使用 try/catch 方法。
其余的我们只是依靠检查返回码,如下所示:
spark-submit --master yarn ... bla bla
ret_val=$?
...
为什么?因为您需要纠正一些一般性的问题,我们需要重新开始。很难动态地纠正某些事情。您的日程安排工具也可以处理这个问题,Rundeck、Airflow...等。
更高级的重启选项是可能的,但只是变得复杂,但可以完成。正如您在选项 2 中提到的那样。但从未见过这样做过。
推荐阅读
- python-3.x - 加入多个熊猫数据框
- redis - 使用 RedisGears 对接收 pubsub 消息进行计算
- r - 识别和替换公式中的参数,如 R 中的字符串
- python - Python安装pyPdf错误'utf-8'编解码器无法解码位置64的字节0x88
- excel - 在 excel 中从 CustomXmlPart 遍历 XML 时遇到问题
- scipy - 如何合并边界条件并从 scipy.interpolate.Bspline.basis_element 构建基函数,包括边界条件?
- javascript - Javascript在引号中加粗文本字符串
- javascript - 滚动条的禁用区域
- postgresql - 与 Azure VM 上的 PostgreSQL 数据库层失去连接
- java - 由缠绕规则创建的区域