首页 > 解决方案 > 为什么 spark.ml CrossValidator 使用大数据集提供“广播大小为 X 的大型任务二进制文件”?

问题描述

问题:

我一直致力于使用 Pyspark 和 Spark ml 库分发 CrossValidation 过程,因此与常规顺序计算(即 scikit)相比,它需要的时间更少。但是,我在这样做时遇到了一些问题。具体来说,当我开始工作时,我不断收到消息“<em>Broadcasting large task binary with size X”(X 是从 1700 KiB 到 6 MiB 的数字)。在我离开工作一段时间后,它最终以消息“<em>Job X 由于 SparkContext 已关闭而被取消”(对于很多 Xs = 工作)和“<em>ERROR TransportRequestHandler: Error while invoking RpcHandler# receive() 用于单向消息。org.apache.spark.SparkException:找不到 CoarseGrainedScheduler”。

推理:

因为我不得不在“<a href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/tuning.html# CrossValidator" rel="noreferrer">pyspark.ml.tuning#CrossValidator",我对它的运作方式非常熟悉,知道它分配任务的方式是通过对数据集的每个拆分并行化模型的训练不同的参数设置。也就是说,CrossValidator _fit 每次都将整个数据集发送给 executor,以便在每个 executor 中单独训练一个具有特定参数组合的模型,而且 Spark 似乎不喜欢过多地广播数据集。这是 pyspark.ml.tunning _fit 方法的相关部分:

        for i in range(nFolds):
        validation = datasets[i][1].cache()
        train = datasets[i][0].cache()

        tasks = _parallelFitTasks(est, train, eva, validation, epm, collectSubModelsParam)
        for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):
            metrics[j] += (metric / nFolds)
            if collectSubModelsParam:
                subModels[i][j] = subModel

        validation.unpersist()
        train.unpersist()

我试过的:

我已经尝试了最常见的广播警告解决方案,即使我已经想象它们在我的情况下不起作用。具体来说,我修改了数据的分区和并行化参数,以及执行器和驱动程序的内存大小。

我很确定ml库中是否存在CrossValidator的分布式实现是因为它实际上很有用。但是,我一定遗漏了一些东西,因为如果我的数据集很大并且需要广播很多次(因为实施),我无法考虑如何使它工作。也许我错过了什么?

标签: apache-sparkmachine-learningpysparkapache-spark-ml

解决方案


推荐阅读