首页 > 解决方案 > Spark Graphframes 大型数据集和内存问题

问题描述

我想在相对较大的图 35 亿节点 900 亿边上运行 pagerank。我一直在尝试不同的集群大小来让它运行。但首先是代码:

from pyspark.sql import SparkSession
import graphframes

spark = SparkSession.builder.getOrCreate()

edges_DF = spark.read.parquet('s3://path/to/edges') # 1.4TB total size
verts_DF   = spark.read.parquet('s3://path/to/verts') # 25GB total size

graph_GDF = graphframes.GraphFrame(verts_DF, edges_DF)
graph_GDF = graph_GDF.dropIsolatedVertices()

result_df   = graph_GDF.pageRank(resetProbability=0.15, tol=0.1)
pagerank_df = result_df.vertices
pagerank_df.write.parquet('s3://path/to/output', mode='overwrite')

我从一开始就经历了很多垃圾收集问题。所以我为集群尝试了不同的设置和大小。我主要关注了两篇文章:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

我在亚马逊 EMR 上运行集群。这些是我目前使用的相关设置:

"spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.6,graphframes:graphframes:0.7.0-spark2.4-s_2.11",
"spark.dynamicAllocation.enabled": "false",
"spark.network.timeout":"1600s",
"spark.executor.heartbeatInterval":"120s",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.shuffle.partitions":"1216"

"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"

"maximizeResourceAllocation": "true"

"fs.s3.maxConnections": "5000",
"fs.s3.consistent": "true",
"fs.s3.consistent.throwExceptionOnInconsistency":"false",
"fs.s3.consistent.retryPolicyType":"fixed",
"fs.s3.consistent.retryPeriodSeconds":"10"

我尝试了集群大小我的第一个似乎可行的实验是具有以下参数的集群:--deploy-mode cluster --num-executors 75 --executor-cores 5 --executor-memory 36g --driver-memory 36g --driver-cores 5

有了这个配置GC时间,一切都在工作,但由于它是一个测试集群,它总共有非常“小”的内存,2.7 TB过了一段时间我也得到了ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Container from a bad node Exit status: 137.我认为这是因为我留下了node很少的 RAM。所以我重新运行了整个过程,但这一次--executor-cores 5 --executor-memory 35g我的问题马上就GC出现了,我的集群表现得很奇怪。所以我想我理解了这个问题,高GC时间的原因不是每个执行程序的内存不足。

我启动的下一个集群使用以下参数:--deploy-mode cluster --num-executors 179 --executor-cores 5 --executor-memory 45g --driver-memory 45g --driver-cores 5

因此,与以前一样,每个执行程序更大的集群和更多的内存。一切都运行顺利,我注意到ganglia第一步大约5.5 TB是 ram。

虽然我理解使用更少的可用内核并扩大每个执行程序的内存会使程序更快的问题,但我猜它与verts_DF大约 25gb 的大小有关,这样它将适合每个执行程序的内存执行器留出空间计算(25GB * 179 几乎是 5.5TB)。

因此,我启动的下一个集群具有相同数量的节点,但我将执行器的大小调整为:--num-executors 119 --executor-cores 5 --executor-memory 75g

瞬间所有的问题都回来了!集群GC挂起的大部分时间ganglia我可以看到 RAM 填充了 9 个可用 TB 中的 8 个。我很困惑。我回去并--num-executors 179 --executor-cores 5 --executor-memory 45g再次启动集群,幸运的是这很容易做到,EMR因为我可以克隆它。但现在这种配置也不起作用。高倍GC集群8TB立即命中已用内存。

这里发生了什么?感觉就像我玩轮盘赌一样,有时相同的配置有效,有时却无效?

标签: apache-sparkpysparkamazon-emrgraphframes

解决方案


如果有人在一段时间后仍然偶然发现这一点,它就会意识到问题在于如何graphxgraphframes加载图表。两者都试图生成它们正在加载的图的所有三元组,其中非常大的图会导致OOM错误,因为具有 35 亿个节点和 700 亿条边的图已经诅咒了其中的许多。我通过在pyspark. 它肯定不如scala实现快,但它可以扩展并且不会遇到所描述的三元组问题。我在 github 上发布了它 https://github.com/thagorx/spark_pagerank


推荐阅读