python - PySpark groupby applyInPandas 将对象保存为文件问题
问题描述
我在 Jupyter Notebook 上以本地模式在 Windows 计算机上运行 PySpark 3.1。我在 Spark DataFrame 上调用“applyInPandas”。
下面的函数对输入的 Pandas DataFrame 应用了一些数据转换,并训练了一个 SGBT 模型。然后它将训练好的模型序列化为二进制并作为对象保存到 S3 存储桶中。最后它返回DataFrame。我从最后一行按两列分组的 Spark DataFrame 调用此函数。我没有收到错误,返回的 DataFrame 与输入的长度相同。返回每个组的数据。
问题是保存的模型对象。当每个组都应该有模型时,只有 2 组的对象保存在 S3 中。没有丢失/错误的数据点会导致模型训练失败。(无论如何我都会收到错误或警告。)到目前为止我尝试过的内容:
- 替换 S3 并保存到本地文件系统:结果相同。
- 将“pickle”替换为“joblib”和“BytesIO”:结果相同。
- 在调用函数之前重新分区:现在我为不同的组保存了更多的对象,但不是全部。[我通过在最后一行调用“val_large_df.coalesce(1).groupby('la...”来做到这一点。]
所以我怀疑这是关于并行性和分布的,但我想不通。已经谢谢你了。
def train_sgbt(pdf):
##Some data transformations here##
#Train the model
sgbt_mdl=GradientBoostingRegressor(--Params.--).fit(--Params.--)
sgbt_mdl_b=pickle.dumps(sgbt_mdl) #Serialize
#Initiate s3_client
s3_client = boto3.client(--Params.--)
#Put file in S3
s3_client.put_object(Body=sgbt_mdl_b, Bucket='my-bucket-name',
Key="models/BT_"+str(pdf.latGroup_m[0])+"_"+str(pdf.lonGroup_m[0])+".mdl")
return pdf
dummy_df=val_large_df.groupby("latGroup_m","lonGroup_m").applyInPandas(train_sgbt,
schema="fcast_error double")
dummy_df.show()
解决方案
Spark 评估dummy_df
惰性,因此train_sgbt
只会为完成 Spark 操作所需的组调用。
这里的 Spark 动作是show()
. 此操作仅打印前 20 行,因此train_sgbt
仅对前 20 行中至少有一个元素的组调用。Spark可能会评估更多组,但不能保证。
解决问题的一种方法是调用另一个动作,例如csv
。
推荐阅读
- r - 基于自定义中断的ggplot颜色Y轴
- excel - Ruby 阅读 Excel 在值中包含额外的 '[" "]',我该如何删除它们?
- r - 使用 R 的前向选择方法
- postgresql - 创建搜索“%txt”时使用的索引
- sql-server - 是否可以在 T-SQL 中使用带有交叉连接的连接提示?
- powerbi - Power BI 行级安全性 (RLS),如果匹配字段的一部分(不是整个字段)
- java - java正则表达式捕获或字符串开头的制表符或空格
- c++ - 带有条件的 constexpr 类成员函数内的非 constexpr 变体成员调用编译 - 为什么?
- java - 如何将人工签名添加到从签名板捕获的 PDF 并使用 Java 和 iTEXT7 将其设置为 PDF?
- python-3.x - 正则表达式问题在 BeautifulSoup 中查找 html 等标签(可自定义)