首页 > 解决方案 > 多个联合后如何解决 Apache Spark StackOverflowError

问题描述

我有一个 Spark Scala 程序,它使用 REST API 批量获取数据,一旦检索到所有数据,我就会对它们进行操作。

当前计划:

重现问题的简单程序:

    def main(args: Array[String]) = {
     val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
     val sc = new SparkContext(conf)
     val limit = 1000;
     var rdd = sc.emptyRDD[Int]
     for (x <- 1 to limit) {
       val currentRdd = sc.parallelize(x to x + 3)
       rdd = rdd.union(currentRdd)
     }
     println(rdd.sum())
   }

问题: - 当批次数很高时,程序会抛出StackOverflowErrorException in thread "main" java.lang.StackOverflowError at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply

我假设,当批次数量增加时,RDD 依赖图变得非常复杂并引发错误。

解决此问题的最佳方法是什么?

标签: scalaapache-sparkrdd

解决方案


已经SparkContext.union知道如何正确计算union多个RDDs 的 a:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = sc.union(rdds)

或者,您可以尝试使用辅助函数来避免创建长链unions:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = balancedReduce(rdds)(_ union _)

它应该工作的原因与链接答案中的基本相同:O(n)unions 炸毁堆栈,O(log(n))-high 二叉树unions 没有。


推荐阅读