apache-spark - 将 udf 函数更改为 spark sql 函数
问题描述
我有以下 df history_trends:
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|index|correct_list_last_detect_price_DRM_without_NA |
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+
|BABA |2021-06-15 16:36:30|NO |null |1 |[] |
|BABA |2021-06-15 16:37:00|NO |null |2 |[] |
|BABA |2021-06-15 16:37:30|YES |211.85 |3 |[211.85] |
|BABA |2021-06-15 16:38:00|NO |211.85 |4 |[211.85] |
|BABA |2021-06-15 16:38:30|NO |211.85 |5 |[211.85] |
|BABA |2021-06-15 16:39:00|NO |211.85 |6 |[211.85] |
|BABA |2021-06-15 16:39:30|NO |211.85 |7 |[211.85] |
|BABA |2021-06-15 16:40:00|NO |211.85 |8 |[211.85] |
|BABA |2021-06-15 16:40:30|NO |211.85 |9 |[211.85] |
|BABA |2021-06-15 16:41:00|YES |211.91 |10 |[211.85, 211.91] |
|BABA |2021-06-15 16:41:30|NO |211.91 |11 |[211.85, 211.91] |
|BABA |2021-06-15 16:42:00|NO |211.91 |12 |[211.85, 211.91] |
|BABA |2021-06-15 16:42:30|YES |211.83 |13 |[211.85, 211.91, 211.83] |
|BABA |2021-06-15 16:43:00|NO |211.83 |14 |[211.85, 211.91, 211.83] |
|BABA |2021-06-15 16:43:30|YES |211.75 |15 |[211.85, 211.91, 211.83, 211.75] |
|BABA |2021-06-15 16:44:00|NO |211.75 |16 |[211.85, 211.91, 211.83, 211.75] |
|BABA |2021-06-15 16:44:30|NO |211.75 |17 |[211.85, 211.91, 211.83, 211.75] |
|BABA |2021-06-15 16:45:00|NO |211.75 |18 |[211.85, 211.91, 211.83, 211.75] |
|BABA |2021-06-15 16:45:30|YES |211.72 |19 |[211.85, 211.91, 211.83, 211.75, 211.72] |
|BABA |2021-06-15 16:46:00|NO |211.72 |20 |[211.85, 211.91, 211.83, 211.75, 211.72] |
|BABA |2021-06-15 16:46:30|NO |211.72 |21 |[211.85, 211.91, 211.83, 211.75, 211.72] |
|BABA |2021-06-15 16:47:00|NO |211.72 |22 |[211.85, 211.91, 211.83, 211.75, 211.72] |
|BABA |2021-06-15 16:47:30|YES |211.81 |23 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81] |
|BABA |2021-06-15 16:48:00|NO |211.81 |24 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81] |
|BABA |2021-06-15 16:48:30|NO |211.81 |25 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81] |
|BABA |2021-06-15 16:49:00|YES |211.93 |26 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|
|BABA |2021-06-15 16:49:30|NO |211.93 |27 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|
|BABA |2021-06-15 16:50:00|NO |211.93 |28 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+
现在我使用以下 udf 函数remove_from_list_udf计算列start_data_analysis_window,基本上有两列作为参数传递给它。一列包含一个数字,另一列包含一个数组,它执行的功能是循环数组,当数字小于或等于时,它将存储在一个辅助列表中,该列表将被返回。
def remove_from_list(last_detect_price_DRM, correct_list_last_detect_price_DRM_without_NA):
final_list = []
for i in correct_list_last_detect_price_DRM_without_NA:
try:
if last_detect_price_DRM <= float(i):
final_list.append(i)
except TypeError:
pass
return final_list
remove_from_list_udf = f.udf(remove_from_list, ArrayType(StringType()))
historical_trends = historical_trends\
.withColumn(
"start_data_analysis_window",
remove_from_list_udf(f.col('last_detect_price_DRM'), f.col('correct_list_last_detect_price_DRM_without_NA'))
)\
.withColumn(
'number_DRM', f.size(f.col('start_data_analysis_window'))
)
结果和预期的一样,所以我很高兴:
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+----------------------------------------+----------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|index|correct_list_last_detect_price_DRM_without_NA |start_data_analysis_window |number_DRM|
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+----------------------------------------+----------+
|BABA |2021-06-15 16:36:30|NO |null |1 |[] |[] |0 |
|BABA |2021-06-15 16:37:00|NO |null |2 |[] |[] |0 |
|BABA |2021-06-15 16:37:30|YES |211.85 |3 |[211.85] |[211.85] |1 |
|BABA |2021-06-15 16:38:00|NO |211.85 |4 |[211.85] |[211.85] |1 |
|BABA |2021-06-15 16:38:30|NO |211.85 |5 |[211.85] |[211.85] |1 |
|BABA |2021-06-15 16:39:00|NO |211.85 |6 |[211.85] |[211.85] |1 |
|BABA |2021-06-15 16:39:30|NO |211.85 |7 |[211.85] |[211.85] |1 |
|BABA |2021-06-15 16:40:00|NO |211.85 |8 |[211.85] |[211.85] |1 |
|BABA |2021-06-15 16:40:30|NO |211.85 |9 |[211.85] |[211.85] |1 |
|BABA |2021-06-15 16:41:00|YES |211.91 |10 |[211.85, 211.91] |[211.91] |1 |
|BABA |2021-06-15 16:41:30|NO |211.91 |11 |[211.85, 211.91] |[211.91] |1 |
|BABA |2021-06-15 16:42:00|NO |211.91 |12 |[211.85, 211.91] |[211.91] |1 |
|BABA |2021-06-15 16:42:30|YES |211.83 |13 |[211.85, 211.91, 211.83] |[211.85, 211.91, 211.83] |3 |
|BABA |2021-06-15 16:43:00|NO |211.83 |14 |[211.85, 211.91, 211.83] |[211.85, 211.91, 211.83] |3 |
|BABA |2021-06-15 16:43:30|YES |211.75 |15 |[211.85, 211.91, 211.83, 211.75] |[211.85, 211.91, 211.83, 211.75] |4 |
|BABA |2021-06-15 16:44:00|NO |211.75 |16 |[211.85, 211.91, 211.83, 211.75] |[211.85, 211.91, 211.83, 211.75] |4 |
|BABA |2021-06-15 16:44:30|NO |211.75 |17 |[211.85, 211.91, 211.83, 211.75] |[211.85, 211.91, 211.83, 211.75] |4 |
|BABA |2021-06-15 16:45:00|NO |211.75 |18 |[211.85, 211.91, 211.83, 211.75] |[211.85, 211.91, 211.83, 211.75] |4 |
|BABA |2021-06-15 16:45:30|YES |211.72 |19 |[211.85, 211.91, 211.83, 211.75, 211.72] |[211.85, 211.91, 211.83, 211.75, 211.72]|5 |
|BABA |2021-06-15 16:46:00|NO |211.72 |20 |[211.85, 211.91, 211.83, 211.75, 211.72] |[211.85, 211.91, 211.83, 211.75, 211.72]|5 |
|BABA |2021-06-15 16:46:30|NO |211.72 |21 |[211.85, 211.91, 211.83, 211.75, 211.72] |[211.85, 211.91, 211.83, 211.75, 211.72]|5 |
|BABA |2021-06-15 16:47:00|NO |211.72 |22 |[211.85, 211.91, 211.83, 211.75, 211.72] |[211.85, 211.91, 211.83, 211.75, 211.72]|5 |
|BABA |2021-06-15 16:47:30|YES |211.81 |23 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81] |[211.85, 211.91, 211.83, 211.81] |4 |
|BABA |2021-06-15 16:48:00|NO |211.81 |24 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81] |[211.85, 211.91, 211.83, 211.81] |4 |
|BABA |2021-06-15 16:48:30|NO |211.81 |25 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81] |[211.85, 211.91, 211.83, 211.81] |4 |
|BABA |2021-06-15 16:49:00|YES |211.93 |26 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|[211.93] |1 |
|BABA |2021-06-15 16:49:30|NO |211.93 |27 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|[211.93] |1 |
|BABA |2021-06-15 16:50:00|NO |211.93 |28 |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|[211.93] |1 |
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+----------------------------------------+----------+
现在,我的问题是是否有任何方法可以使用 Spark SQL 函数覆盖 udf 函数。
如果您不理解问题而不是得分否定,请要求澄清。所以我在提问时也有所改进;)
谢谢!
解决方案
我建议您阅读并了解有关高阶函数的更多信息。
historical_trends = historical_trends \
.withColumn(
"start_data_analysis_window",
f.expr("filter(correct_list_last_detect_price_DRM_without_NA, value -> value <= last_detect_price_DRM)")
)
推荐阅读
- c# - 将 CSV 文件序列化和反序列化为流
- ansible - How to pass a variable that contains json objects as an externalvariable to execute ansible playbooks?
- c - How to get CPU cores' energy consumption in Linux kernel?
- java - 我可以在没有 sql 知识的情况下构建一个 java 桌面应用程序吗
- docker - Google Cloud Build + Google Secret Manager 替换问题
- react-table - 如何将列隐藏从 react-table 6 升级到 react-table 7
- python - 使用 (R)Selenium 从 LinkedIn 抓取“体验”部分 - 抓取该部分:id="experience-section"
- sqlite - 如何从 ROBLOX 中的 SQLite 数据库中读取数据
- mapbox - 在地图上为 Mapbox 上的地址创建自定义区域的文档
- java - Spring Boot - 创建名称为“”的bean时出错