首页 > 解决方案 > 有没有办法限制 PySpark 中某些内存密集型 UDF 计算的节点并行化?

问题描述

背景

PySpark 允许您将任意 pandas 函数应用于 Spark DataFrame 组,如下所示:

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

在后台,Spark 将数据从工作节点分发df到工作节点,根据“id”进行分区,将用户定义的 pandas 函数并行应用于每个组,然后将这些结果合并。

问题 我要运行的 pandas UDF 非常占用内存。当 Spark 尝试在同一个节点上并行运行多个 UDF 实例(例如 64 个)时,该节点会耗尽内存,并且 UDF 会引发内存错误。

问题 有没有办法告诉 Spark:“跨节点并行化这个 Grouped UDF,但每个节点上最多只能并行运行 k 个 UDF 实例。”?(例如,k=1 表示每个节点一次只执行一个 UDF 实例)。

解决方案的先前尝试 分解df成更小的块,并使用 for 循环依次处理它们。为什么这很糟糕?因为我们希望 Spark 能够利用跨节点的并行化!

标签: apache-sparkpyspark

解决方案


推荐阅读