apache-spark - 有没有办法限制 PySpark 中某些内存密集型 UDF 计算的节点并行化?
问题描述
背景
PySpark 允许您将任意 pandas 函数应用于 Spark DataFrame 组,如下所示:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
在后台,Spark 将数据从工作节点分发df
到工作节点,根据“id”进行分区,将用户定义的 pandas 函数并行应用于每个组,然后将这些结果合并。
问题 我要运行的 pandas UDF 非常占用内存。当 Spark 尝试在同一个节点上并行运行多个 UDF 实例(例如 64 个)时,该节点会耗尽内存,并且 UDF 会引发内存错误。
问题 有没有办法告诉 Spark:“跨节点并行化这个 Grouped UDF,但每个节点上最多只能并行运行 k 个 UDF 实例。”?(例如,k=1 表示每个节点一次只执行一个 UDF 实例)。
解决方案的先前尝试
分解df
成更小的块,并使用 for 循环依次处理它们。为什么这很糟糕?因为我们希望 Spark 能够利用跨节点的并行化!
解决方案
推荐阅读
- r - 使用 gsub 替换 R 中的多个单词
- mysql - 用 mysql 分页 Nodejs 表达
- c# - C# .net Core 异常 - 子类中的更改消息
- android - react-native-device-info 在 android 上使应用程序崩溃
- dart - Flutter - 在 null 上调用了方法“map”
- google-apps - “与 Google 集成”按钮突然消失(接收 404)
- kotlin - 在 kotlin 中使用带有 rxjava 的密封类时出现类型不匹配
- excel - 使用activex控件从左到右滚动excel窗口
- jquery-select2 - 如何在select2删除按钮上添加条件
- configure - 配置:错误:未找到 gdal-includes