首页 > 解决方案 > Spark 在多个执行器上分发任务

问题描述

我想并行运行一个 SQL 查询,并且能够将并行度控制到 8 个查询。现在,我正在做这段代码。这个想法是创建 8 个分区并允许执行程序并行运行它们。

  (1 to 8).toSeq.toDF.repartition(8) // 8 partitions
  .rdd.mapPartitions(
  x => {
  val conn = createConnection()
    x.foreach{
      s => { // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = ${s.get(0)}")
      }
    }
  conn.close()
  x
  }).take(1)

问题是 8 个查询是一一运行的。

我应该如何继续让查询运行 8 by 8 ?

标签: apache-spark

解决方案


当你这样做

val df = (1 to 8).toSeq.toDF.repartition(8)

这不会创建 8 个分区,每个分区有 1 条记录。如果您检查此数据框(参见例如https://stackoverflow.com/a/46032600/1138523),那么您会得到:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|               0|                0|
|               1|                0|
|               2|                0|
|               3|                0|
|               4|                0|
|               5|                0|
|               6|                4|
|               7|                4|
+----------------+-----------------+

因此,您将只有 2 个非空分区,因此您将拥有最大 2 倍并行度(我在这里问过这个问题:Spark 中的循环分区如何工作?

要制作大小相等的分区,您最好使用

spark.sparkContext.parallelize((0 to 7), numSlices = 8)

代替

(1 to 8).toSeq.toDF.repartition(8).rdd

第一个选项为每个分区提供 1 条记录,第二个选项不是因为它使用循环分区

作为旁注,当你这样做时x.foreach, thenx将被消耗(迭代器只能遍历一次),所以如果你返回x,你总是会得到一个空的迭代器。

所以你的最终代码看起来像这样:

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
  x => {
  val xL = x.toList  // convert to List
  assert(xL.size==1) // make sure partition has only 1 record

  val conn = createConnection()
    xL.foreach{
      s => { // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = ${s}")
      }
    }
  conn.close()
  xL.toIterator
  })
 .collect // trigger all queries

除了使用mapPartitions(这是懒惰的),您还可以使用foreachPartition非懒惰的

由于每个分区只有 1 条记录,因此迭代分区并没有真正的好处,您也可以只使用 plain foreach

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> {
  val conn = createConnection()
  execute(s"SELECT * FROM myTable WHERE col = ${s}")   
  conn.close()
})

推荐阅读