python - Pyspark 使用 Window 函数和我自己的函数
问题描述
我有一个 Pandas 的代码,它计算我在大小为 x 的窗口上的线性回归的 R2。查看我的代码:
def lr_r2_Sklearn(data):
data = np.array(data)
X = pd.Series(list(range(0,len(data),1))).values.reshape(-1,1)
Y = data.reshape(-1,1)
regressor = LinearRegression()
regressor.fit(X,Y)
return(regressor.score(X,Y))
r2_rolling = df[['value']].rolling(300).agg([lr_r2_Sklearn])
我正在滚动大小为 300 并计算每个窗口的 r2。我希望做同样的事情,但使用 pyspark 和 spark 数据框。我知道我必须使用Window功能,但是它比pandas更难理解,所以我迷路了......
我有这个,但我不知道如何使它工作。
w = Window().partitionBy(lit(1)).rowsBetween(-299,0)
data.select(lr_r2('value').over(w).alias('r2')).show()
(lr_r2 返回 r2)
谢谢 !
解决方案
您需要一个带有有界条件的带有 pandas udf 的 udf。这在 spark3.0 之前是不可能的,并且正在开发中。请在此处参考答案:要应用于 PySpark 中的窗口的用户定义函数? 但是,您可以探索 pyspark 的 ml 包:http://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html#pyspark.ml.classification.LinearSVC 所以 您可以定义这样的模型作为 linearSVC 并在组装后将数据帧的各个部分传递给它。我建议使用由阶段、汇编器和分类器组成的管道,然后使用数据帧的各个部分通过一些唯一的 id 对其进行过滤,从而在循环中调用它们。
推荐阅读
- php - Codeigniter 打印 Domp PDF 分页限制每页 5 条记录 在 foreach 表中
- rest - 我应该声明什么 POST 方法参数类型来接收未序列化的数据?
- vbscript - HTA + vbscript 文件未在 Windows 10 机器中加载
- flutter - 为什么 Flutter 不会立即更新 setState 中的变量?
- kubernetes - 将 tcpdump 输出存储在另一个 POD 中 - kubernetes
- slack - 是否可以在 CPLEX 中为 MILP 执行敏感性分析?
- excel - 查找输入数据 - 运行时错误 13 - 类型不匹配
- java - 如何将类路径添加到maven
- javascript - JS返回结构
- swiftui - onTapGesture 关闭键盘使视图无法操作