首页 > 解决方案 > 如何在 Spark 结构化流中使用连接池覆盖 ForeachWriter

问题描述

我想在 Spark 结构化流中使用连接池,但我不想使用 writeStream 来新建连接池,我应该如何更新?

现在我想在 application.conf 中设置变量,在 main 方法中获取 application.conf 位置之前加载对象,如下所示:

class JdbcSink extends ForeachWriter[Row] with Serializable with Settings {
  self: SinkStatement =>

  import JdbcSink.dsPool

  var connection: Connection = _
  //the sql statement
  var statement: Statement = _

  //open
  def open(partitionId: Long, version: Long): Boolean = {
    connection = dsPool.getConnection()
    statement = connection.createStatement()
    true
  }

  //execute
  def process(value: Row): Unit = {
    //execute
    statement.executeUpdate(this.make(value))
  }


  //close
  def close(errorOrNull: Throwable): Unit = {
    //close the connection
    dsPool.evictConnection(connection)
  }

}
object JdbcSink extends Settings {


  val config = new HikariConfig
  config.setDriverClassName(this.sinkDbDriver)
  config.setJdbcUrl(this.sinkDbUrl)
  config.setUsername(this.sinkDbUser)
  config.setPassword(this.sinkDbPwd)
  config.addDataSourceProperty("cachePrepStmts", "true")
  config.addDataSourceProperty("prepStmtCacheSize", "250")
  config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
  val dsPool = new HikariDataSource(this.HConfig)
}

我如何获得灵活的设置?像这样,“with Settings”得到“can't initiallize jdbcSink”excetption

      val config = new HikariConfig
      config.setDriverClassName (this.sinkDbDriver)
      config.setJdbcUrl (this.sinkDbUrl)
      config.setUsername (this.sinkDbUser)
      config.setPassword (this.sinkDbPwd)
      config.addDataSourceProperty ("cachePrepStmts", "true")
      config.addDataSourceProperty ("prepStmtCacheSize", "250")
      config.addDataSourceProperty ("prepStmtCacheSqlLimit", "2048")
      val dsPool = new HikariDataSource (this.HConfig)

标签: apache-spark

解决方案


推荐阅读