首页 > 解决方案 > 类方法为 Pyspark udf

问题描述

我有以下代码

import numpy as np
import pandas as pd

class MyClass:
    def __init__(self, a: pd.Series):
        self.a = a

    def f(self, b: pd.Series):
        return np.exp(a) + b

我还有一个带有双列的 Pyspark 数据框ab. 我想跑

df.withColumn('c', MyClass(df['a']).f(df['b']))

这当然失败了。我如何正确调整代码MyClass以使其正常工作。(请注意,我不能简单地用fPyspark 函数来编写函数。

标签: pythonpandasapache-sparkpysparkuser-defined-functions

解决方案


您可以添加一个 UDF 来包装该类:

import pyspark.sql.functions as F
import pandas as pd
import numpy as np

class MyClass:
    def __init__(self, a: pd.Series):
        self.a = a
    def f(self, b: pd.Series):
        return np.exp(self.a) + b

@F.pandas_udf('float')
def myClassUDF(a: pd.Series, b: pd.Series) -> pd.Series:
    return MyClass(a).f(b)

df = spark.createDataFrame([[0,1], [0,2]],['a','b'])

df.withColumn('c', myClassUDF('a','b')).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  0|  1|2.0|
|  0|  2|3.0|
+---+---+---+

推荐阅读