scala - Spark 如何处理涉及 JDBC 数据源的故障场景?
问题描述
我正在编写一个与 Spark 的 JDBC 数据源实现有相似之处的数据源,我想问一下 Spark 如何处理某些故障场景。据我了解,如果执行程序在运行任务时死亡,Spark 将恢复执行程序并尝试重新运行该任务。然而,这在数据完整性和 Spark 的 JDBC 数据源 API(例如 )的上下文中如何发挥作用df.write.format("jdbc").option(...).save()
?
在 JdbcUtils.scala 的 savePartition 函数中,我们看到 Spark 调用了从用户提供的数据库 url/凭据生成的 Java 连接对象的提交和回滚函数(见下文)。但是,如果执行器在 commit() 完成后或调用 rollback() 之前立即死亡,Spark 是否会尝试重新运行任务并再次写入相同的数据分区,实质上是在数据库中创建重复的已提交行?如果 executor 在调用 commit() 或 rollback() 的过程中死亡会发生什么?
try {
...
if (supportsTransactions) {
conn.commit()
}
committed = true
Iterator.empty
} catch {
case e: SQLException =>
...
throw e
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
...
}
}
解决方案
但是,如果执行程序在 commit() 完成后或调用 rollback() 之前立即死亡,Spark 是否会尝试重新运行任务并再次写入相同的数据分区,实质上是在数据库中创建重复的已提交行?
由于 Spark SQL(它是基于 RDD API 的高级 API)对 JDBC 或任何其他协议的所有特性知之甚少,您会期待什么?更不用说底层的执行运行时,即 Spark Core。
当您编写像df.write.format(“jdbc”).option(...).save()
Spark 这样的结构化查询时,SQL 会使用类似低级程序集的 RDD API 将其转换为分布式计算。由于它试图包含尽可能多的“协议”(包括 JDBC),Spark SQL 的 DataSource API 将大部分错误处理留给了数据源本身。
调度任务的 Spark 核心(不知道甚至不关心任务做什么)只是监视执行,如果任务失败,它将尝试再次执行它(直到默认尝试 3 次失败)。
因此,当您编写自定义数据源时,您会知道如何练习,并且必须在代码中处理此类重试。
处理错误的一种方法是使用TaskContext(例如addTaskCompletionListener
或addTaskFailureListener
)注册任务侦听器。
推荐阅读
- r - 无法在 R 中创建降价文档
- swift - 是否可以在 iOS 上使用 AudioKit 通过 HFP 将音频从 iPhone 内置麦克风流式传输到耳机?
- asp.net-mvc - ASP.Net core + wind auth + 自定义角色:“主域和可信域之间的信任关系失败”
- c# - EF Core LINQ Where with extension 方法未转换为 SQL
- sql - sqlite 上的 SQL Group By 和 COALESCE 问题
- html - Bootstrap:将分页与动态选项卡式界面结合起来 Bug
- mysql - mysql group by和两个总和之间的比率
- python-3.x - 如何定位特定输入以使用 requests 或 requests_html 发布
- python - 与 Pandas 数据帧上的“groupby”结合使用时,将关键字参数添加到“apply”函数的问题
- java - 为什么必须初始化静态最终变量?