首页 > 解决方案 > 使用 2 个 scala 期货在 foreachPartition 内进行并行 JDBC 插入是否安全?

问题描述

我目前有一个插入 MySQL 数据库的 Spark 流应用程序。我遵循的模式

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

可以在spark 流文档中找到。问题是我想这样修改它,

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>

    val f1 = Future {
      //insert data to database 1
      val connection = createNewConnection(db1_config)
      partitionOfRecords.foreach(record => connection.send(record))
      connection.close()
    }
  }

  val f2 = Future {
    //insert data to database 2
    val connection = createNewConnection(db2_config)
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }

  //await for both f1 and f2 to finish

}

实际上,我发现这行得通,但我想确保我不会在脚上开枪。我在我的应用程序中使用1 个执行器和 1 个核心对此进行了测试,发现当我使用全局执行上下文时,两个期货并行工作。所以我假设全局执行上下文提供了 2 个线程来处理。但是这两个线程是从哪里来的呢?当我将 Spark 作业提交给 YARN 时,我指定了 1 个执行器和 1 个核心。当我在那台机器上执行 LSCPU 时,我发现每个核心有 2 个线程。那么我是在该内核上使用这两个线程还是全局执行上下文为我提供了来自另一个内核的可用线程?我只想知道这是否是一种安全模式,而不是从其他应用程序中“窃取”线程。

标签: multithreadingapache-sparkconcurrencyspark-streaming

解决方案


推荐阅读