python - UDF 函数需要很长时间
问题描述
我有一个这样的数据框:
SCORE = spark.createDataFrame(
[
('a', "Joe", 1),
('b', "Doe", 2),
('c', "Carl", 3),
('d', "CJ", 4),
('e', "Tom", 5),
],
StructType(
[
StructField("id", StringType(), False),
StructField("user", StringType(), False),
StructField("score", IntegerType(), False),
]
)
)
ID | 用户 | 分数 |
---|---|---|
一个 | 乔 | 1 |
b | 能源部 | 2 |
C | 卡尔 | 3 |
d | CJ | 4 |
e | 汤姆 | 5 |
我编写了一个 UDF 来计算 percentile_score,它基于整个分数列。它正在工作,正在生成一个名为 percentile_score 的新列:
from pyspark.sql.functions import udf, collect_list
def calculate_percentile(user_score, score_list):
data_prs_score.sort()
scores_count = len(score_list)
cumulative_frequency = 0
frequency = 0
for score in score_list:
if score == user_score:
frequency += 1
elif score > user_score:
break
cumulative_frequency += 1
return (cumulative_frequency - (0.5 * frequency)) / scores_count
def make_score_list(score_list):
return udf(lambda user_score: calculate_percentile(user_score, score_list), FloatType())
SCORE.withColumn('percentile_score', make_prs_score_list(SCORE.select(collect_list('score')).collect()[0][0])(col('score'))).show()
我的问题是,此功能需要 1 小时才能运行。
我认为花费这么长时间的原因是我在 UDF 上使用了 collect() 。但是,我看不到另一种构建方式。
所以我想知道我可以在这里做什么样的优化。
解决方案
您可以使用percent_rank计算每行的百分位数:
from pyspark.sql import functions as F
SCORE.withColumn("percentile_score",
F.percent_rank().over(Window.orderBy("score"))) \
.show()
印刷
+---+----+-----+----------------+
| id|user|score|percentile_score|
+---+----+-----+----------------+
| a| Joe| 1| 0.0|
| b| Doe| 2| 0.25|
| c|Carl| 3| 0.5|
| d| CJ| 4| 0.75|
| e| Tom| 5| 1.0|
+---+----+-----+----------------+
但是会出现以下警告:
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
就像警告所说的那样,所有数据都将被收集到一个分区中,因此您(暂时)失去了 Spark 的并行性。但很有可能这种方法仍然比 UDF 快。
推荐阅读
- swagger-2.0 - 从 openapi 生成的模型可以有自定义和多个路径吗?
- python - 完成屏幕测量后生成新屏幕,无需重启树莓派
- pdf2htmlex - 转换过程中的 pdf2htmlEX 错误 - CMap 无效并因字体而被删除
- html - 我自己的网站需要多长时间才能完全更新并显示我所做的新更改?
- python - 我的条目的 stringvar 无法正常工作
- java - 在SD卡中写入文件时权限被拒绝
- php - Macos Apache , 403 禁止
- java - 使用 Roundingparam 设置CornerRadius,视图不会抗锯齿
- ruby - 运行同步作业时拒绝访问 S3 存储桶
- postgresql - 获取索引列总大小