python - 如何将大型 python 模型应用于 pyspark-dataframe?
问题描述
我有:
- 包含一些数据(特征)的大型数据框(parquet 格式,100.000.000 行,4.5TB 大小)
- 几个巨大的 ML 模型(每个需要 5-15GB 的 RAM)
- Spark 集群 (AWS EMR),典型的节点配置是 8 个 CPU,32 个 RAM,可以根据需要进行更改。
我想使用 PySpark 应用它们,但我总是遇到一些有线错误,例如:
- OOM
- 随机超时(节点不返回任何结果)-> 被 YARN 管理器杀死的节点
我通常使用类似的代码
def apply_model(partition):
model = load(...) # load model only when apply this function to avoid serialization issue
for row in partition:
yield model.infer(row)
或者
def apply_model(partition):
model = load(...) # load model only when apply this function to
yield from model.infer(partition)
并使用
df.select(...).rdd.mapPartitions(apply_model)
由于序列化原因,我无法broadcast
建模。
问题 - 如何应用基于 python/any-non-jvm 的大模型来触发数据帧并避免触发异常?
解决方案
以下是一些有助于提高工作绩效的额外建议:
我要做的第一个更改是减小分区大小。如果我目前理解正确,您输入的数据为 4.5TB。这意味着如果您有 1000 个分区,那么您最终将在每个执行程序上每个分区发送 4.5GB!这个大小被认为是相当大的,相反我会尝试将分区大小保持在 250-500MB 之间。大致在您的情况下,这意味着〜10000(4.5TB / 500MB)分区。
通过添加更多执行器来增加并行度。这将提高数据局部性的级别,从而减少执行时间。理想情况下,每个执行器应该有 5 个核心,每个集群节点应该有两个执行器(如果可能)。每个执行程序的最大内核数不应高于 5,因为这会导致 I/O 瓶颈(当/如果使用磁盘存储时)。
至于内存,我认为@rluta 的建议绰绰有余。通常,执行程序内存的太大值会对 Java GC 时间产生负面影响,因此 10GB 的上限应该是
spark.executor.memory
.
推荐阅读
- javascript - 未捕获的类型错误:无法读取 null 的属性“forEach”
- swift - 在某些情况下通过 Swift 中的 UI TestCases
- xmpp - 正确使用 XMPP 处理消息和历史记录
- angular - 如何在 api 请求标头中插入 api 身份验证所需的 jwt 令牌?
- php - 如何在laravel中返回数组数组
- c# - Blazor 服务器:努力向 Google 身份验证添加策略以限制仅访问我的帐户
- architecture - 具有 N 层的 ASP.Net MVC
- ios - Ios 金刚鹦鹉周期表添加图像
- blockchain - 测试网上的 Chainlink 预言机
- python - Numpy 数组跨越 2 个值并获取索引