apache-spark - pyspark - 将 UDF 应用于列列表并返回多个数据帧
问题描述
我创建了一个udf
, 旨在ffill
和bfill
一个列,并返回一个新的 imputed dataframe
。错误不在函数中,因为它运行良好。
见下面我的功能:
def ffill_bfill(df,partition_by_col,order_by_col,col_to_imp):
'''Forward fill and Backward fill a column by a column/set of columns (order_col).
Parameters:
------------
df: Dataframe that the columns are in (Company wide? Company Narrow?)
order_col: String or List of string. This is the Year column until we get more granular time data!!
fill_col: String (Only work for a column). The name of the column to be imputed!!
Return:
---------
df: Dataframe
Return df with the filled_cols.
'''
# create the series containing the forward filled values
window_ff = Window.partitionBy(partition_by_col).orderBy(order_by_col).rowsBetween(-sys.maxsize, 0)
# create the series containing the backward filled values
window_bf = Window.partitionBy(partition_by_col).orderBy(order_by_col).rowsBetween(0, sys.maxsize)
# create the series containing the BACKWARD filled values for the two columns
s_bf = func.first(df[col_to_imp], ignorenulls=True).over(window_bf)
# create the series containing the FORWARD filled values for the two columns
s_ff = func.last(df[col_to_imp], ignorenulls=True).over(window_ff)
# add the IMPUTED column to a dataframe
imputed_df = df_company_wide.withColumn(f'{col_to_imp}_bf', s_bf)\
.withColumn(f'{col_to_imp}_ff', s_ff)
# Fill in the nulls with the imputed values
imputed_df = imputed_df.withColumn(f'{col_to_imp}_imp',coalesce(col_to_imp,f'{col_to_imp}_ff',f'{col_to_imp}_bf'))
# Create the imputed dataframes
cols_to_use = ['isin','company','year',col_to_imp]+[s for s in imputed_df.columns if col_to_imp in s and 'imp' in s]
imputed_df_final = imputed_df.select(cols_to_use)
return imputed_df_final
问题在于我应用该功能的方式:
我的意图是在 4 列中应用该函数,并返回 4 个估算数据帧。我尝试使用以下代码来做到这一点:
# Get the columns to be imputed in a list
features_to_impute = ['mobile_maximum_plan_for_one',
'mobile_minimum_plan_for_one',
'slowest_internet_speed',
'fastest_internet_speed']
# Return a dataframe and make available for SQL
for feature in features_to_impute:
f"{feature}_imp"= ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp=f"'{feature}'")
f"{feature}_imputed".createOrReplaceTempView(f"{feature}_imputed")
当我运行上面的命令时,我得到了错误:
SyntaxError: can't assign to literal
File "<command-575233896480136>", line 21
f"{feature}_imp"= ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp=f"'{feature}'")
^
SyntaxError: can't assign to literal
但是当我尝试一次在 1 列上应用该功能时(如下所示),它可以工作:
mobile_maximum_plan_for_one_imputed = ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp='mobile_maximum_plan_for_one')
mobile_minimum_plan_for_one_imputed.show()
+------------+----------------+------+---------------------------+-------------------------------+
| isin| company| year|mobile_minimum_plan_for_one|mobile_minimum_plan_for_one_imp|
+------------+----------------+------+---------------------------+-------------------------------+
|BE0003810273| Proximus|2015.0| null| 11.19820828667413|
|BE0003810273| Proximus|2016.0| null| 11.19820828667413|
|BE0003810273| Proximus|2017.0| null| 11.19820828667413|
|BE0003810273| Proximus|2018.0| null| 11.19820828667413|
|BE0003810273| Proximus|2019.0| 11.19820828667413| 11.19820828667413|
|CH0008742519| Swisscom|2015.0| null| 29.82|
|CH0008742519| Swisscom|2016.0| null| 29.82|
|CH0008742519| Swisscom|2017.0| null| 29.82|
|CH0008742519| Swisscom|2018.0| 29.82| 29.82|
|CH0008742519| Swisscom|2019.0| 29.82| 29.82|
有人可以阐明如何修复 for 循环,以成功恢复具有估算值的 4 个不同的数据帧吗?一个好的解释会增加很多价值!
提前谢谢了。
解决方案
您的代码无法运行,因为您将数据框分配给字符串而不是变量。在任何情况下,使用变量变量名都不是一个好习惯。您可以考虑为此目的使用字典。
features = dict()
for feature in features_to_impute:
features[f"{feature}_imp"] = ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp=f"'{feature}'")
features[f"{feature}_imputed"].createOrReplaceTempView(f"{feature}_imputed")
推荐阅读
- r - R per ID column 按值列表删除行
- node.js - Mongoose/Mongodb 查询不适用于 TypeScript 对象
- formatting - PhpStorm - 如何移动分隔线
- c++ - 分配内存时出现 QT 错误代码 -529697949
- powershell - Cloudberry Powershell 管理单元:基于 LastWriteTime 同步文件
- tensorflow - 动态RNN:填充词向量
- html - 如何删除用于解析实际值的 html 标签,然后将它们放回去?
- node.js - 如何从 Node.js/Express 服务器调用 GraphQL API?
- javascript - 拖动节点时移动整个 vis 网络
- coldfusion - Coldfusion检查输入值是否为空白,然后在另一个输入中设置