首页 > 解决方案 > Spark Graphx java.lang.OutOfMemoryError

问题描述

我对 Spark GraphX 模块有疑问。我有一个 5 节点集群,每个节点有 23.5G 内存和 24 个内核。我使用 spark-shell 提交代码,所以我在客户端模式下使用 Spark。在我的配置中,我有 1 个主节点和 4 个从节点。这是我的 spark-defaults.conf:

spark.executor.instances                8
spark.executor.memory                   10g
spark.driver.memory                     18g
spark.executor.cores                    10
spark.driver.cores                      18
spark.default.parallelism               144
spark.serializer                        org.apache.spark.serializer.KryoSerializer

我读取并存储了 2 个非常小的文件,总共 40mb 的文件大小。

这是我的代码:

val input1 = sc.textFile("/home/data/spark/nodes.txt")
val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))

val input2 = sc.textFile("/home/data/spark/edges.txt")
val splitRdd = input2.map( line => line.split(" ") )
val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))

val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
val edges: RDD[Edge[String]] = edgeArray
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)

var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
  mutable.Set[VertexId]()
}).cache()
val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
  triplet => {
    if(triplet.dstId < 0){
      triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
    }
  },
  (oldSet, newSet) => oldSet ++ newSet // Merge Message
)
filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
  triplet => {
    if(triplet.dstId > 0){
      triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
    }
  },
  (oldSet, newSet) => oldSet ++ newSet // Merge Message
)
val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
  (vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
})


val newNames = Seq("rid", "candidates")
val candidatesDF = candidatesRDD.toDF(newNames: _*)
val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
candidatesDFMod.show

如果我进行计算,我会在几次之后得到java.lang.OutOfMemoryError: Java heap space一个执行者的异常。在此 Spark 尝试再次计算它之后,它会重新启动阶段,但最终会再次出现相同的异常。为什么会这样?计算填满了完整的 10G 执行器内存。我是否有错误的 Spark 配置?我尝试了我的 spark-defaults.conf 的几种排列。我尝试了每个节点 3 个 Executor 等等,我更改了内存大小等等。但每次它都以相同的异常结束。

也许有人对这个问题有想法?

亲切的问候

标签: javascalaapache-sparkrddspark-graphx

解决方案


spark.executor.instances 8

spark.executor.cores 10

val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4) ^^^

如果您只创建 4 个分区,那么拥有 8 个执行程序(每个执行程序有 10 个内核)是没有意义的。请记住,执行程序上的所有分区必须一起放入内存中,以避免 GC 抖动。尝试使用更多的分区,以便 10 个分区轻松放入内存中,每个分区可能有几百 MB 的输入数据。另外,请检查每个工作节点上实际可用的 10GB RAM 和执行驱动程序的机器上实际可用的 16GB - 如果您的某些工作节点上没有可用 RAM,您可以减少内核数量和Spark 配置中的内存。


推荐阅读