apache-spark - Tensorflow Java 在 YARN 上使用过多内存和火花
问题描述
当使用 tensorflow java 进行推理时,使作业在 YARN 上运行的内存量异常大。这项工作在我的计算机上完美运行(2 核 16Gb RAM),需要 35 分钟才能完成。但是当我尝试在 YARN 上使用 10 个执行程序、16Gb 内存和 16Gb memoryOverhead 运行它时,执行程序会因为使用太多内存而被杀死。
使用 YARN 2.7.3 和 Spark 2.2.1 在 Hortonworks 集群上运行预测。以前我们使用 DL4J 进行推理,一切都在 3 分钟内运行。张量在使用后正确关闭,我们使用 mapPartition 进行预测。每个任务包含大约 20.000 条记录 (1Mb),因此这将使输入张量为 2.000.000x14,输出张量为 2.000.000 (5Mb)。
在 YARN 上运行时传递给 spark 的选项
--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2
如果我们设置 spark.sql.shuffle.partitions=2000,这个配置可能会起作用,但需要 3 小时
更新:
本地和集群之间的差异实际上是由于缺少过滤器。实际上,我们对比我们更多的数据进行预测。
解决方案
要减少每个分区的内存占用,您必须在每个分区内创建批处理(使用grouped(batchSize)
)。因此,您比为每一行运行预测要快,并且您分配预定大小(batchSize)的张量。如果您调查tensorflowOnSpark scala 推理的代码,这就是他们所做的。您将在下面找到一个重新设计的实现示例,此代码可能无法编译,但您知道如何执行此操作。
lazy val sess = SavedModelBundle.load(modelPath, "serve").session
lazy val numberOfFeatures = 1
lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
lazy val numberOfOutputs = 1
val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
partition.grouped(batchSize).flatMap { batchPreprocessed =>
val numberOfLines = batchPreprocessed.size
val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)
val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)
for (
featuresWithKey <- batchPreprocessed;
feature <- featuresWithKey.features
) {
featuresBuffer.put(feature)
}
featuresBuffer.flip()
val featuresTensor = Tensor.create(featuresShape, featuresBuffer)
val results: Tensor[_] = sess.runner
.feed("cost", featuresTensor)
.fetch("prediction")
.run.get(0)
val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
val outputArray: Array[Array[Float]] = results.copyTo(output)
results.close()
featuresTensor.close()
outputArray
}
}
spark.createDataFrame(predictionsRDD)
我们按照本期的建议使用 FloatBuffer 和 Shape 来创建张量
推荐阅读
- c# - C# 将变量 double 转换为浮点数
- android - AudioRecord 同时播放音频 - 访问输出播放数据
- git - 如何在检查先前的提交并对其进行更改后推送提交
- matlab - MATLAB - 使用 buffer() 分割向量的问题
- php - 在 Woocommerce 的任何页面上仅显示单个链接的产品类别文本
- optimization - 如何解释 docplex 优化库的日志输出
- html - 在 Blogger 博客中隐藏侧边栏后,如何将特定的帖子页面设为 100%?
- php - 数组中 php (json) 的最后一条记录然后加 1
- python - 我无法清除 kivy 的根
- java - BigInteger 非不变的替代方案/解决方法