首页 > 解决方案 > 如何在 Spark Executor 中关闭内核之间的共享单例连接

问题描述

我在 Spark 的单个执行器的所有核心之间使用共享连接。基本上,我创建了单例连接对象,以便在单个执行器的核心之间共享,以便在核心之间共享,并且每个执行器只有 1 个连接。

object SingletonConnection {

private var connection: Connection = null

def getConnection(url: String, username: String, password: String): Connection = synchronized {
if (connection == null) {
  connection = DriverManager.getConnection(url, username, password)
}
connection
}
}

Spark执行器代码:

dataFrame.foreachPartition { batch =>
  if (batch.nonEmpty) {
    lazy val dbConnection = SingletonConnection
    val dbc = dbConnection.getConnection(url, user, password)

    // do some operatoins


          st.addBatch()
        }
        st.executeBatch()
      }
    }
    catch {
      case exec: BatchUpdateException =>
        var ex: SQLException = exec
        while (ex != null) {
          ex.printStackTrace()
          ex = ex.getNextException
        }
        throw exec
    }

  }
}

这里的问题是,我无法关闭连接。因为我不知道特定核心何时完成执行。如果我最终关闭连接,一旦一个核心完成其任务,它就会关闭连接,这会导致所有其他核心停止,因为共享连接已关闭。

由于我没有在此处关闭连接,因此即使在任务完成后连接仍保持打开状态。我怎样才能使这个过程正常工作,以便只有在所有核心完成任务后才能关闭连接。

标签: multithreadingscalaapache-sparkdatabase-connectionexecutor

解决方案


我使用 Java 实现了它,所以我可以给你一些线索。

在 SingletonConnection 类中,我创建了一个线程安全的累加器。每次打开连接时,累加器都会加一。每次关闭连接前,累加器减一,检查累加器是否为零。当累加器为零时,您可以关闭连接。

当其他正在运行的线程仍在使用连接时,这不会关闭连接。但这会让您创建比您想象的更多的连接(分区数量)。


推荐阅读