apache-spark - Spark java.lang.OutOfMemoryError:Java 堆空间
问题描述
当我使用 spark 运行模型训练管道时出现上述错误
`val inputData = spark.read
.option("header", true)
.option("mode","DROPMALFORMED")
.csv(input)
.repartition(500)
.toDF("b", "c")
.withColumn("b", lower(col("b")))
.withColumn("c", lower(col("c")))
.toDF("b", "c")
.na.drop()`
inputData有大约 2500 万行,大小约为 2gb。模型构建阶段是这样发生的
val tokenizer = new Tokenizer()
.setInputCol("c")
.setOutputCol("tokens")
val cvSpec = new CountVectorizer()
.setInputCol("tokens")
.setOutputCol("features")
.setMinDF(minDF)
.setVocabSize(vocabSize)
val nb = new NaiveBayes()
.setLabelCol("bi")
.setFeaturesCol("features")
.setPredictionCol("prediction")
.setSmoothing(smoothing)
new Pipeline().setStages(Array(tokenizer, cvSpec, nb)).fit(inputData)
我正在使用以下命令在具有 16gb RAM 的机器上本地运行上述 spark 作业
spark-submit --class holmes.model.building.ModelBuilder ./holmes-model-building/target/scala-2.11/holmes-model-building_2.11-1.0.0-SNAPSHOT-7d6978.jar --master local[*] --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryoserializer.buffer.max=2000m --conf spark.driver.maxResultSize=2g --conf spark.rpc.message.maxSize=1024 --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=50g --driver-memory=12g
oom 错误由 org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:706) 触发(在堆栈跟踪的底部)
日志:
Caused by: java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1897) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:706)
任何建议都会很棒:)
解决方案
我会尝试的事情:
1) 删除spark.memory.offHeap.enabled=true
驱动程序内存并将其增加到盒子上可用内存的 90%。您可能已经意识到这一点,因为您没有设置执行程序内存,但是在本地模式下,驱动程序和执行程序都在由驱动程序内存控制的同一进程中运行。我还没有尝试过,但该offHeap
功能听起来价值有限。参考
2)一个实际的集群而不是本地模式。更多的节点显然会给你更多的内存。
3a) 如果您想坚持使用本地模式,请尝试使用更少的内核。您可以通过指定在主设置中使用的核心数量来做到这一点,--master local[4]
而不是local[*]
使用所有核心。使用更少的线程同时处理数据将导致在任何给定时间在 RAM 中的数据更少。
3b) 如果您移动到集群,您可能还需要调整执行器核心的数量,原因与上述相同。您可以使用--executor-cores
标志来执行此操作。
4)尝试更多的分区。在您的示例代码中,您重新分区为 500 个分区,也许尝试 1000 或 2000?更多分区意味着每个分区更小,内存压力更小。
推荐阅读
- python - 从变量中减去 - Thonny Python
- splunk - 使用 fluent-plugin-grok-parser 和 splunk-hec 图像
- java - 我可以在不定义任何自定义 TypeMaps 的情况下使用 ModelMapper 映射到非默认构造函数吗?
- bluetooth - 无法解压
- javascript - 获取用户是否使用 vanilla javascript 响应页面结束
- java - 比较字符串(索引)
- c# - 使用 RestSharp 获取 PayPal 访问令牌返回“状态代码:未找到”
- python - 尝试构建 python 应用程序的 Docker 映像时,我得到 'AttributeError: module 'sipbuild.api' has no attribute 'prepare_metadata_for_build_wheel'
- python - 为什么链接提取器会跳过链接?
- excel - Excel - 在一行中搜索特定文本,然后将该值复制并粘贴到特定单元格中