multithreading - 如何在 Spark Executor 中关闭内核之间的共享单例连接
问题描述
我在 Spark 的单个执行器的所有核心之间使用共享连接。基本上,我创建了单例连接对象,以便在单个执行器的核心之间共享,以便在核心之间共享,并且每个执行器只有 1 个连接。
object SingletonConnection {
private var connection: Connection = null
def getConnection(url: String, username: String, password: String): Connection = synchronized {
if (connection == null) {
connection = DriverManager.getConnection(url, username, password)
}
connection
}
}
Spark执行器代码:
dataFrame.foreachPartition { batch =>
if (batch.nonEmpty) {
lazy val dbConnection = SingletonConnection
val dbc = dbConnection.getConnection(url, user, password)
// do some operatoins
st.addBatch()
}
st.executeBatch()
}
}
catch {
case exec: BatchUpdateException =>
var ex: SQLException = exec
while (ex != null) {
ex.printStackTrace()
ex = ex.getNextException
}
throw exec
}
}
}
这里的问题是,我无法关闭连接。因为我不知道特定核心何时完成执行。如果我最终关闭连接,一旦一个核心完成其任务,它就会关闭连接,这会导致所有其他核心停止,因为共享连接已关闭。
由于我没有在此处关闭连接,因此即使在任务完成后连接仍保持打开状态。我怎样才能使这个过程正常工作,以便只有在所有核心完成任务后才能关闭连接。
解决方案
我使用 Java 实现了它,所以我可以给你一些线索。
在 SingletonConnection 类中,我创建了一个线程安全的累加器。每次打开连接时,累加器都会加一。每次关闭连接前,累加器减一,检查累加器是否为零。当累加器为零时,您可以关闭连接。
当其他正在运行的线程仍在使用连接时,这不会关闭连接。但这会让您创建比您想象的更多的连接(分区数量)。
推荐阅读
- wordpress - 如何在 iframe 中放置简码?
- javascript - 将媒体查询动态添加到页面并覆盖来自应用程序端生成的 html 的样式
- python-3.x - 将时间增量转换为熊猫中连续时间点的整数
- php - 通过单击按钮 CodeIgniter 加载视图
- python - Python - 随机延迟调度
- python - python在字典中附加一个带有字典的数组
- dictionary - 如何在 Map 中找到最小元素并返回一个元组(键,最小元素)?
- excel - 为什么当我在 excel VBA 上运行这个宏时什么都没有发生?我错过了什么吗?
- html - 为什么我的第一行的高度大于其他行 HTML
- python - 如何在单个 Django 视图中跨多个模型进行查询?