首页 > 解决方案 > 如何将大型 python 模型应用于 pyspark-dataframe?

问题描述

我有:

我想使用 PySpark 应用它们,但我总是遇到一些有线错误,例如:

我通常使用类似的代码

def apply_model(partition):
    model = load(...)  # load model only when apply this function to avoid serialization issue
    for row in partition:
        yield model.infer(row)

或者

def apply_model(partition):
    model = load(...)  # load model only when apply this function to 
    yield from model.infer(partition)

并使用

df.select(...).rdd.mapPartitions(apply_model)

由于序列化原因,我无法broadcast建模。

问题 - 如何应用基于 python/any-non-jvm 的大模型来触发数据帧并避免触发异常?

标签: pythonapache-sparkmachine-learningpysparkpyspark-sql

解决方案


以下是一些有助于提高工作绩效的额外建议:

  • 我要做的第一个更改是减小分区大小。如果我目前理解正确,您输入的数据为 4.5TB。这意味着如果您有 1000 个分区,那么您最终将在每个执行程序上每个分区发送 4.5GB!这个大小被认为是相当的,相反我会尝试将分区大小保持在 250-500MB 之间。大致在您的情况下,这意味着〜10000(4.5TB / 500MB)分区。

  • 通过添加更多执行器来增加并行度。这将提高数据局部性的级别,从而减少执行时间。理想情况下,每个执行器应该有 5 个核心,每个集群节点应该有两个执行器(如果可能)。每个执行程序的最大内核数不应高于 5,因为这会导致 I/O 瓶颈(当/如果使用磁盘存储时)。

  • 至于内存,我认为@rluta 的建议绰绰有余。通常,执行程序内存的太大值会对 Java GC 时间产生负面影响,因此 10GB 的上限应该是spark.executor.memory.


推荐阅读