scala - 多个联合后如何解决 Apache Spark StackOverflowError
问题描述
我有一个 Spark Scala 程序,它使用 REST API 批量获取数据,一旦检索到所有数据,我就会对它们进行操作。
当前计划:
对于每个批次,创建 RDD 并将其与使用先前 API 调用创建的先前 RDD 合并
rdd.union(currentRdd)
。对最终的 RDD 进行操作
重现问题的简单程序:
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
val sc = new SparkContext(conf)
val limit = 1000;
var rdd = sc.emptyRDD[Int]
for (x <- 1 to limit) {
val currentRdd = sc.parallelize(x to x + 3)
rdd = rdd.union(currentRdd)
}
println(rdd.sum())
}
问题:
- 当批次数很高时,程序会抛出StackOverflowError
:Exception in thread "main" java.lang.StackOverflowError
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply
我假设,当批次数量增加时,RDD 依赖图变得非常复杂并引发错误。
解决此问题的最佳方法是什么?
解决方案
已经SparkContext.union
知道如何正确计算union
多个RDD
s 的 a:
val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = sc.union(rdds)
或者,您可以尝试使用此辅助函数来避免创建长链union
s:
val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = balancedReduce(rdds)(_ union _)
它应该工作的原因与链接答案中的基本相同:O(n)
链union
s 炸毁堆栈,O(log(n))
-high 二叉树union
s 没有。
推荐阅读
- java - JavaFX 在 UI 中的两个窗格之间切换
- python - Python将字符串转换为时间
- curl - 语音命令:/usr/lib/arm-linux-gnueabihf/libcurl.so.4:在树莓派上找不到版本“CURL_OPENSSL_3”(语音命令需要)
- python - __init__() 缺少 1 个必需的位置参数:'rec'
- java - 如何在循环中引用具有相同变量名的不同对象
- javascript - Echart:如何设置标记区域以填充 xAxis 中的部分
- python - 如何使用 click_log 控制具有多个模块的 python 项目中的日志记录级别?
- c# - 无法在模态窗口中执行任何操作
- amazon-web-services - 在 AWS 任务中运行两个 docker 映像
- continuous-integration - Teamcity & Udeploy 持续集成和部署