apache-spark - Spark 在多个执行器上分发任务
问题描述
我想并行运行一个 SQL 查询,并且能够将并行度控制到 8 个查询。现在,我正在做这段代码。这个想法是创建 8 个分区并允许执行程序并行运行它们。
(1 to 8).toSeq.toDF.repartition(8) // 8 partitions
.rdd.mapPartitions(
x => {
val conn = createConnection()
x.foreach{
s => { // expect the below query be run concurently
execute(s"SELECT * FROM myTable WHERE col = ${s.get(0)}")
}
}
conn.close()
x
}).take(1)
问题是 8 个查询是一一运行的。
我应该如何继续让查询运行 8 by 8 ?
解决方案
当你这样做
val df = (1 to 8).toSeq.toDF.repartition(8)
这不会创建 8 个分区,每个分区有 1 条记录。如果您检查此数据框(参见例如https://stackoverflow.com/a/46032600/1138523),那么您会得到:
+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 4|
| 7| 4|
+----------------+-----------------+
因此,您将只有 2 个非空分区,因此您将拥有最大 2 倍并行度(我在这里问过这个问题:Spark 中的循环分区如何工作?)
要制作大小相等的分区,您最好使用
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
代替
(1 to 8).toSeq.toDF.repartition(8).rdd
第一个选项为每个分区提供 1 条记录,第二个选项不是因为它使用循环分区
作为旁注,当你这样做时x.foreach
, thenx
将被消耗(迭代器只能遍历一次),所以如果你返回x
,你总是会得到一个空的迭代器。
所以你的最终代码看起来像这样:
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
x => {
val xL = x.toList // convert to List
assert(xL.size==1) // make sure partition has only 1 record
val conn = createConnection()
xL.foreach{
s => { // expect the below query be run concurently
execute(s"SELECT * FROM myTable WHERE col = ${s}")
}
}
conn.close()
xL.toIterator
})
.collect // trigger all queries
除了使用mapPartitions
(这是懒惰的),您还可以使用foreachPartition
非懒惰的
由于每个分区只有 1 条记录,因此迭代分区并没有真正的好处,您也可以只使用 plain foreach
:
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> {
val conn = createConnection()
execute(s"SELECT * FROM myTable WHERE col = ${s}")
conn.close()
})
推荐阅读
- swagger-ui - 参考 OpenAPI 3.0 中的 self
- java - 如何调用我已经在 XML 中的数据库中加载的数组列表
- android - 如何从 Firebase 实时数据库中读取推送的数据
- javascript - 选项卡作为单选选择并清除选项卡更改基础上的复选框 6
- angular - 使用 webpack 在 @angular/elements 组件包中重用 Angular
- excel - 如何将工作簿中的所有工作表导出到单个文本文件?
- azure-cognitive-search - 转义特殊字符和使用 Azure 搜索选择正确分析器的问题
- r - 在列表中的样本预测之外,指定要在 R 中预测的列表名称
- elasticsearch - 如何通过单个查询在弹性搜索中获得文档的排名?
- hive - Left outer join query with Null check in hive is not working