首页 > 解决方案 > 如何在 Spark 中以多个系列作为输入最有效地使用 Pandas UDF

问题描述

我有一些 PySpark 代码,旨在在 pyspark 数据帧上运行在 sklearn 中训练的机器学习模型,如下所示:

from sklearn.ensemble import RandomForestRegressor
X = np.random.rand(1000, 100)
y = np.random.randint(2, size=1000)
tree = RandomForestRegressor(n_jobs=4)
tree.fit(X, y)
pdf = pd.DataFrame(X)
df = spark.createDataFrame(pdf)
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double')
# Input/output are both a pandas.Series of doubles
def pandas_plus_one(*args):
    return pd.Series(tree.predict(pd.concat([args[i] for i in range(100)],axis=1)))

df = df.withColumn('result', pandas_plus_one(*[df[i] for i in range(100)]))

我的问题是,这是用 PySpark 做事的最有效方法吗?特别是,我想避免必须执行 pd.concat ,这涉及将所有系列(无论如何可能在内存中相邻)复制到 UDF 函数内的新 pandas DataFrame。理想的解决方案是让 Pandas UDF 接受 DataFrame 作为输入,但我还没有找到让它工作的方法。

注意:我不是在寻找涉及 SparkML scikit-spark 等的解决方案。

标签: pandasapache-sparkpyspark

解决方案


推荐阅读