apache-spark - Spark Graphframes 大型数据集和内存问题
问题描述
我想在相对较大的图 35 亿节点 900 亿边上运行 pagerank。我一直在尝试不同的集群大小来让它运行。但首先是代码:
from pyspark.sql import SparkSession
import graphframes
spark = SparkSession.builder.getOrCreate()
edges_DF = spark.read.parquet('s3://path/to/edges') # 1.4TB total size
verts_DF = spark.read.parquet('s3://path/to/verts') # 25GB total size
graph_GDF = graphframes.GraphFrame(verts_DF, edges_DF)
graph_GDF = graph_GDF.dropIsolatedVertices()
result_df = graph_GDF.pageRank(resetProbability=0.15, tol=0.1)
pagerank_df = result_df.vertices
pagerank_df.write.parquet('s3://path/to/output', mode='overwrite')
我从一开始就经历了很多垃圾收集问题。所以我为集群尝试了不同的设置和大小。我主要关注了两篇文章:
https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
我在亚马逊 EMR 上运行集群。这些是我目前使用的相关设置:
"spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.6,graphframes:graphframes:0.7.0-spark2.4-s_2.11",
"spark.dynamicAllocation.enabled": "false",
"spark.network.timeout":"1600s",
"spark.executor.heartbeatInterval":"120s",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.shuffle.partitions":"1216"
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
"maximizeResourceAllocation": "true"
"fs.s3.maxConnections": "5000",
"fs.s3.consistent": "true",
"fs.s3.consistent.throwExceptionOnInconsistency":"false",
"fs.s3.consistent.retryPolicyType":"fixed",
"fs.s3.consistent.retryPeriodSeconds":"10"
我尝试了集群大小我的第一个似乎可行的实验是具有以下参数的集群:--deploy-mode cluster --num-executors 75 --executor-cores 5 --executor-memory 36g --driver-memory 36g --driver-cores 5
有了这个配置GC
时间,一切都在工作,但由于它是一个测试集群,它总共有非常“小”的内存,2.7 TB
过了一段时间我也得到了ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Container from a bad node Exit status: 137.
我认为这是因为我留下了node
很少的 RAM。所以我重新运行了整个过程,但这一次--executor-cores 5 --executor-memory 35g
我的问题马上就GC
出现了,我的集群表现得很奇怪。所以我想我理解了这个问题,高GC
时间的原因不是每个执行程序的内存不足。
我启动的下一个集群使用以下参数:--deploy-mode cluster --num-executors 179 --executor-cores 5 --executor-memory 45g --driver-memory 45g --driver-cores 5
因此,与以前一样,每个执行程序更大的集群和更多的内存。一切都运行顺利,我注意到ganglia
第一步大约5.5 TB
是 ram。
虽然我理解使用更少的可用内核并扩大每个执行程序的内存会使程序更快的问题,但我猜它与verts_DF
大约 25gb 的大小有关,这样它将适合每个执行程序的内存执行器留出空间计算(25GB * 179 几乎是 5.5TB)。
因此,我启动的下一个集群具有相同数量的节点,但我将执行器的大小调整为:--num-executors 119 --executor-cores 5 --executor-memory 75g
瞬间所有的问题都回来了!集群GC
挂起的大部分时间ganglia
我可以看到 RAM 填充了 9 个可用 TB 中的 8 个。我很困惑。我回去并--num-executors 179 --executor-cores 5 --executor-memory 45g
再次启动集群,幸运的是这很容易做到,EMR
因为我可以克隆它。但现在这种配置也不起作用。高倍GC
集群8TB
立即命中已用内存。
这里发生了什么?感觉就像我玩轮盘赌一样,有时相同的配置有效,有时却无效?
解决方案
如果有人在一段时间后仍然偶然发现这一点,它就会意识到问题在于如何graphx
或graphframes
加载图表。两者都试图生成它们正在加载的图的所有三元组,其中非常大的图会导致OOM
错误,因为具有 35 亿个节点和 700 亿条边的图已经诅咒了其中的许多。我通过在pyspark
. 它肯定不如scala
实现快,但它可以扩展并且不会遇到所描述的三元组问题。我在 github 上发布了它
https://github.com/thagorx/spark_pagerank
推荐阅读
- dask - 时间维度的块大小不适用于 xr.open_mfdataset
- java - Java图形(颜色和字体)
- c++ - 反转计数的错误答案?
- javascript - 在 JavaScript 中访问嵌套数组
- python - 函数缺少 1 个必需的位置参数
- python - Pandas:如何对不连续的日期列进行分组?
- assembly - Linux 下 x86 程序集中系统调用的错误处理
- html - 将 div 元素定位在绝对定位的图像上
- nativescript - 如何在 RadListView 中创建项目阴影?
- apache - Spring Boot,带有 MTOM 的 Apache CXF 3.2.5 发送空附件