首页 > 解决方案 > PySpark groupby applyInPandas 将对象保存为文件问题

问题描述

我在 Jupyter Notebook 上以本地模式在 Windows 计算机上运行 PySpark 3.1。我在 Spark DataFrame 上调用“applyInPandas”。

下面的函数对输入的 Pandas DataFrame 应用了一些数据转换,并训练了一个 SGBT 模型。然后它将训练好的模型序列化为二进制并作为对象保存到 S3 存储桶中。最后它返回DataFrame。我从最后一行按两列分组的 Spark DataFrame 调用此函数。我没有收到错误,返回的 DataFrame 与输入的长度相同。返回每个组的数据。

问题是保存的模型对象。当每个组都应该有模型时,只有 2 组的对象保存在 S3 中。没有丢失/错误的数据点会导致模型训练失败。(无论如何我都会收到错误或警告。)到目前为止我尝试过的内容:

所以我怀疑这是关于并行性和分布的,但我想不通。已经谢谢你了。

def train_sgbt(pdf):      
       ##Some data transformations here##    
       #Train the model
       sgbt_mdl=GradientBoostingRegressor(--Params.--).fit(--Params.--)
       sgbt_mdl_b=pickle.dumps(sgbt_mdl) #Serialize
       #Initiate s3_client
       s3_client = boto3.client(--Params.--)
       #Put file in S3
       s3_client.put_object(Body=sgbt_mdl_b, Bucket='my-bucket-name', 
            Key="models/BT_"+str(pdf.latGroup_m[0])+"_"+str(pdf.lonGroup_m[0])+".mdl")    
       return pdf

dummy_df=val_large_df.groupby("latGroup_m","lonGroup_m").applyInPandas(train_sgbt, 
           schema="fcast_error double")
dummy_df.show()

标签: pythonapache-sparkpyspark

解决方案


Spark 评估dummy_df 惰性,因此train_sgbt只会为完成 Spark 操作所需的组调用。

这里的 Spark 动作是show(). 此操作仅打印前 20 行,因此train_sgbt仅对前 20 行中至少有一个元素的组调用。Spark可能会评估更多组,但不能保证。

解决问题的一种方法是调用另一个动作,例如csv


推荐阅读