首页 > 解决方案 > 如何用 PYSPARK 中的平均值替换空值?

问题描述

如何col1用平均值替换列中的空值?但是,有以下条件:

id   col1
1    12
1    NaN
1    14
1    10
2    22
2    20
2    NaN
3    NaN
3    NaN

的 NaN 值id=1应替换为col1计算的平均值id=1,即12((12+14+10)/3))。这同样适用于id=2id=3可以忽略和 NaN等情况。

这是我尝试过的:

calcul = df.groupby("id").agg(func.mean("col1"))

df = df.withColumn("col1", func.when((df["col1"].isNull()), calcul.where(func.col("id")==df["id"])).otherwise(func.col("col1")))

请删除熊猫的重复标签。我使用 PYSPARK。

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


首先null从原始数据框和 groupby 中删除id并取平均值:

df_ave = df1.dropna().groupby('id').agg(F.mean('col1').alias('mean'))
df_ave.show()

输出:

+---+----+
| id|mean|
+---+----+
|  1|12.0|
|  2|21.0|
+---+----+

然后将其与您的原始数据框连接起来:

df1 = df1.join(df_ave, on='id', how='left')
df1.show()

输出:

+---+----+----+
| id|col1|mean|
+---+----+----+
|  1|12.0|12.0|
|  1| NaN|12.0|
|  1|14.0|12.0|
|  1|10.0|12.0|
|  3| NaN|null|
|  3| NaN|null|
|  2|22.0|21.0|
|  2|20.0|21.0|
|  2| NaN|21.0|
+---+----+----+

然后使用以下when语句并创建新列col1_new

df1 = df1.withColumn('col1_new', F.when( df1.col1=='NaN', df1.mean).otherwise(df1.col1))
df1.show()

输出:

+---+----+----+--------+
| id|col1|mean|col1_new|
+---+----+----+--------+
|  1|12.0|12.0|    12.0|
|  1| NaN|12.0|    12.0|
|  1|14.0|12.0|    14.0|
|  1|10.0|12.0|    10.0|
|  3| NaN|null|    null|
|  3| NaN|null|    null|
|  2|22.0|21.0|    22.0|
|  2|20.0|21.0|    20.0|
|  2| NaN|21.0|    21.0|
+---+----+----+--------+

您还可以删除不需要的列并重命名col1_newcol1.

df1 = df1.select('id', 'col1_new').withColumnRenamed('col1_new', 'col1')
df1.show()

输出:

+---+----+
| id|col1|
+---+----+
|  1|12.0|
|  1|12.0|
|  1|14.0|
|  1|10.0|
|  3|null|
|  3|null|
|  2|22.0|
|  2|20.0|
|  2|21.0|
+---+----+

推荐阅读