首页 > 解决方案 > 使用 QuantileDiscretizer 在 pyspark 中的百分位数排名

问题描述

我想知道是否有可能获得在 pyspark中percentile_rank使用变压器的结果。QuantileDiscretizer

目的是我试图避免计算percent_rank整个列,因为它会产生以下错误:

WARN WindowExec: No Partition Defined for Window operation! 
Moving all data to a single partition, this can cause serious performance degradation.

我遵循的方法是先使用QuantileDiscretizer然后归一化为[0,1]:

from pyspark.sql.window import Window
from pyspark.ml.feature import QuantileDiscretizer
from scipy.stats import gamma

X1 = gamma.rvs(0.2, size=1000)

df = spark.createDataFrame(pd.DataFrame(X1, columns=["x"]))

df = df.withColumn("perc_rank", F.percent_rank().over(Window.orderBy("x")))
df = QuantileDiscretizer(numBuckets=df.count()+1,\
                         inputCol="x",\
                         outputCol="q_discretizer").fit(df).transform(df)

agg_values = df.agg(F.max(df["q_discretizer"]).alias("maxval"),\
                    F.min(df["q_discretizer"]).alias("minval")).collect()[0]

xmax, xmin = agg_values.__getitem__("maxval"), agg_values.__getitem__("minval")
normalize = F.udf(lambda x: (x-xmin)/(xmax-xmin))

df = df.withColumn("perc_discretizer", normalize("q_discretizer"))
df = df.withColumn("error", F.round(F.abs(F.col("perc_discretizer")- F.col("perc_rank")),6) )
print(df.select(F.max("error")).show())
df.show(5)

但是,似乎随着数据点数量的增加错误会增加,所以我不确定这是不是正确的方法。

是否可以使用QuantileDiscretizer获取 percentile_rank ?

或者,有没有一种方法可以percentile_rank有效地计算整个列?

标签: pysparkapache-spark-sqlquantile

解决方案


那么您可以使用以下内容来避免警告消息:

X1 = gamma.rvs(0.2, size=10)
df = spark.createDataFrame(pd.DataFrame(X1, columns=["x"]))
df = df.withColumn("dummyCol", F.lit("some_val"))
win = Window.partitionBy("dummyCol").orderBy("x")
df = df.withColumn("perc_rank", F.percent_rank().over(win)).drop("dummyCol")

但尽管如此,数据仍将被移动到单个工作人员,我认为没有更好的选择来避免这里的洗牌,因为需要对完整的列进行排序。

如果您在同一列上有多个窗口,您可以尝试对数据进行预分区,然后应用排名函数。


推荐阅读