首页 > 解决方案 > Tensorflow Java 在 YARN 上使用过多内存和火花

问题描述

当使用 tensorflow java 进行推理时,使作业在 YARN 上运行的内存量异常大。这项工作在我的计算机上完美运行(2 核 16Gb RAM),需要 35 分钟才能完成。但是当我尝试在 YARN 上使用 10 个执行程序、16Gb 内存和 16Gb memoryOverhead 运行它时,执行程序会因为使用太多内存而被杀死。

使用 YARN 2.7.3 和 Spark 2.2.1 在 Hortonworks 集群上运行预测。以前我们使用 DL4J 进行推理,一切都在 3 分钟内运行。张量在使用后正确关闭,我们使用 mapPartition 进行预测。每个任务包含大约 20.000 条记录 (1Mb),因此这将使输入张量为 2.000.000x14,输出张量为 2.000.000 (5Mb)。

在 YARN 上运行时传递给 spark 的选项

--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2

如果我们设置 spark.sql.shuffle.partitions=2000,这个配置可能会起作用,但需要 3 小时

更新:

本地和集群之间的差异实际上是由于缺少过滤器。实际上,我们对比我们更多的数据进行预测。

标签: apache-sparktensorflowhadoop-yarn

解决方案


要减少每个分区的内存占用,您必须在每个分区内创建批处理(使用grouped(batchSize))。因此,您比为每一行运行预测要快,并且您分配预定大小(batchSize)的张量。如果您调查tensorflowOnSpark scala 推理的代码,这就是他们所做的。您将在下面找到一个重新设计的实现示例,此代码可能无法编译,但您知道如何执行此操作。

    lazy val sess = SavedModelBundle.load(modelPath, "serve").session
    lazy val numberOfFeatures = 1
    lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
    lazy val numberOfOutputs = 1
    val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
        partition.grouped(batchSize).flatMap { batchPreprocessed =>
          val numberOfLines = batchPreprocessed.size
          val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)

          val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)

          for (
            featuresWithKey <- batchPreprocessed;
            feature <- featuresWithKey.features
          ) {
            featuresBuffer.put(feature)
          }
          featuresBuffer.flip()
          val featuresTensor = Tensor.create(featuresShape, featuresBuffer)

          val results: Tensor[_] = sess.runner
            .feed("cost", featuresTensor)
            .fetch("prediction")
            .run.get(0)

          val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
          val outputArray: Array[Array[Float]] = results.copyTo(output)

          results.close()
          featuresTensor.close()
          outputArray
        }
    }
    spark.createDataFrame(predictionsRDD)

我们按照本期的建议使用 FloatBuffer 和 Shape 来创建张量


推荐阅读