首页 > 解决方案 > UDF 函数需要很长时间

问题描述

我有一个这样的数据框:

SCORE = spark.createDataFrame(
    [
      ('a', "Joe", 1),
      ('b', "Doe", 2),
      ('c', "Carl", 3),
      ('d', "CJ", 4),
      ('e', "Tom", 5),
    ], 
      StructType(
        [
            StructField("id", StringType(), False),
            StructField("user", StringType(), False),
            StructField("score", IntegerType(), False),
        ]
    )
)
ID 用户 分数
一个 1
b 能源部 2
C 卡尔 3
d CJ 4
e 汤姆 5

我编写了一个 UDF 来计算 percentile_score,它基于整个分数列。它正在工作,正在生成一个名为 percentile_score 的新列:

from pyspark.sql.functions import udf, collect_list


def calculate_percentile(user_score, score_list):
    data_prs_score.sort()
    scores_count = len(score_list)

    cumulative_frequency = 0
    frequency = 0
    for score in score_list:
        if score == user_score:
            frequency += 1
        elif score > user_score:
            break

        cumulative_frequency += 1

    return (cumulative_frequency - (0.5 * frequency)) / scores_count


def make_score_list(score_list):
     return udf(lambda user_score: calculate_percentile(user_score, score_list), FloatType())
SCORE.withColumn('percentile_score', make_prs_score_list(SCORE.select(collect_list('score')).collect()[0][0])(col('score'))).show()

我的问题是,此功能需要 1 小时才能运行。

我认为花费这么长时间的原因是我在 UDF 上使用了 collect() 。但是,我看不到另一种构建方式。

所以我想知道我可以在这里做什么样的优化。

标签: pythonapache-sparkpyspark

解决方案


您可以使用percent_rank计算每行的百分位数:

from pyspark.sql import functions as F

SCORE.withColumn("percentile_score", 
  F.percent_rank().over(Window.orderBy("score"))) \
  .show()

印刷

+---+----+-----+----------------+                                               
| id|user|score|percentile_score|
+---+----+-----+----------------+
|  a| Joe|    1|             0.0|
|  b| Doe|    2|            0.25|
|  c|Carl|    3|             0.5|
|  d|  CJ|    4|            0.75|
|  e| Tom|    5|             1.0|
+---+----+-----+----------------+

但是会出现以下警告:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

就像警告所说的那样,所有数据都将被收集到一个分区中,因此您(暂时)失去了 Spark 的并行性。但很有可能这种方法仍然比 UDF 快。


推荐阅读