首页 > 解决方案 > GCP 上的 PySpark PandasUDF - 内存分配

问题描述

我正在使用 pandas udf 在 Dataproc(Spark)中的 GCP 上训练许多 ML 模型。主要思想是我有一个分组变量,它代表我的数据框中的各种数据集,我运行这样的东西:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def test_train(grp_df):
    
  #train model on grp_df
  #evaluate model 
  #return metrics on 
 
    return (metrics)

result=df.groupBy('group_id').apply(test_train)

这工作正常,除非我使用非采样数据,其中返回的错误似乎与内存问题有关。这些消息是神秘的(对我来说),但如果我对它运行的数据进行采样,如果我不这样做,它就会失败。错误消息是这样的:

OSError:在大小为 573373864 的文件中读取越界(偏移量 = 631044336,大小 = 69873416)

或者

因超出内存限制而被 YARN 杀死的容器。使用了 24.5 GB 的 24 GB 物理内存。由于 YARN-4714,请考虑提高 spark.yarn.executor.memoryOverhead 或禁用 yarn.nodemanager.vmem-check-enabled。

我的问题是如何在集群中设置内存以使其正常工作?

我了解每组数据和正在运行的进程都需要完全适合执行程序的内存。我目前有一个 4-worker 集群,其中包含以下内容:

在此处输入图像描述

如果我认为最大group_id中的最大数据大小需要150GB内存,看来我真的需要每台机器一次操作一个group_id。与拥有单个工作人员或 VM 相比,我至少获得了 4 倍的速度。

如果我执行以下操作,这实际上是否为每台机器创建了 1 个执行程序,该执行程序可以访问所有内核减去 1 和 180 GB 的内存?因此,如果理论上最大的数据组可以在具有这么多 RAM 的单个 VM 上工作,那么这个过程应该可以工作吗?

spark = SparkSession.builder \
  .appName('test') \
  .config('spark.executor.memory', '180g') \
  .config('spark.executor.cores', '63') \
  .config('spark.executor.instances', '1') \
  .getOrCreate() 

标签: apache-sparkpysparkgoogle-cloud-storagegoogle-cloud-dataprocpyarrow

解决方案


让我们将答案分为 3 个部分:

  1. 执行人数量
  2. GroupBy 操作
  3. 你的执行者记忆

执行人数量

直接来自Spark 文档

 spark.executor.instances

 Initial number of executors to run if dynamic allocation is enabled.
 If `--num-executors` (or `spark.executor.instances`) is set and larger
 than this value, it will be used as the initial number of executors.

所以,除非启用动态分配,否则您只会得到一个不会扩展的执行器。

spark.executor.instances您可以通过启用动态执行程序分配来配置或设置基于工作负载的自动扩展,手动增加此类执行程序的数量。

要启用动态分配,您还必须启用 shuffle 服务,该服务允许您安全地删除执行程序。这可以通过设置两个配置来完成:

  1. spark.shuffle.service.enabledtrue. 默认为假。
  2. spark.dynamicAllocation.enabledtrue. 默认为假。

通过...分组

我观察到group_by在 Spark 中使用哈希聚合来完成,这意味着给定x的分区数量,并且唯一的 group_by 值大于x,多个 group by 值将位于同一个分区中。

例如,假设 group_by 列中的两个唯一值分别为 100GiBa1a2150GiB 的总行数。

如果它们属于单独的分区,您的应用程序将运行良好,因为每个分区都将适合执行程序内存 (180GiB),这是内存中处理所必需的,如果它们不适合剩余内存,则剩余的将溢出到磁盘. 但是,如果它们属于同一个分区,您的分区将不适合执行程序内存(180GiB < 250GiB),您将获得 OOM。

spark.default.parallelism在这种情况下,配置为将数据分布在相当多的分区上或应用加盐或其他技术来消除数据偏斜是很有用的。

如果您的数据不是太倾斜,您可以正确地说,只要您的 executor 可以处理最大的 groupby 值,它应该可以工作,因为您的数据将被均匀分区,并且发生上述情况的机会很少。

还有一点需要注意的是,由于你使用group_by的是需要数据shuffle的,所以你也应该开启shuffle服务。如果没有 shuffle 服务,每个 executor 必须在完成自己的工作的同时服务 shuffle 请求。

执行器内存

Spark 中的总执行器内存(实际执行器容器大小)是通过将分配给容器的执行器内存与分配的memoryOverhead. 这些memoryOverhead帐户包括 VM 开销、实习字符串、其他本机开销等。所以,

Total executor memory = (spark.executor.memory + spark.executor.memoryOverhead)
spark.executor.memoryOverhead = max(executorMemory*0.10, 384 MiB)

基于此,您可以根据您的数据将执行程序配置为具有适当的大小。因此,当您将 设置为spark.executor.memory180GiB,启动的实际执行程序应该是198GiB.


推荐阅读