首页 > 解决方案 > Pyspark 使用 Window 函数和我自己的函数

问题描述

我有一个 Pandas 的代码,它计算我在大小为 x 的窗口上的线性回归的 R2。查看我的代码:

def lr_r2_Sklearn(data):
    data = np.array(data)
    X = pd.Series(list(range(0,len(data),1))).values.reshape(-1,1)
    Y = data.reshape(-1,1)

    regressor = LinearRegression()  
    regressor.fit(X,Y)

    return(regressor.score(X,Y))

r2_rolling = df[['value']].rolling(300).agg([lr_r2_Sklearn])

我正在滚动大小为 300 并计算每个窗口的 r2。我希望做同样的事情,但使用 pyspark 和 spark 数据框。我知道我必须使用Window功能,但是它比pandas更难理解,所以我迷路了......

我有这个,但我不知道如何使它工作。

w = Window().partitionBy(lit(1)).rowsBetween(-299,0)
data.select(lr_r2('value').over(w).alias('r2')).show()

(lr_r2 返回 r2)

谢谢 !

标签: pythonpandaspysparkwindow

解决方案


您需要一个带有有界条件的带有 pandas udf 的 udf。这在 spark3.0 之前是不可能的,并且正在开发中。请在此处参考答案:要应用于 PySpark 中的窗口的用户定义函数? 但是,您可以探索 pyspark 的 ml 包:http://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html#pyspark.ml.classification.LinearSVC 所以 您可以定义这样的模型作为 linearSVC 并在组装后将数据帧的各个部分传递给它。我建议使用由阶段、汇编器和分类器组成的管道,然后使用数据帧的各个部分通过一些唯一的 id 对其进行过滤,从而在循环中调用它们。


推荐阅读