首页 > 解决方案 > 将 udf 函数更改为 spark sql 函数

问题描述

我有以下 df history_trends

+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+
|localSymbol_drop|end_window_drop    |detect_DRM|last_detect_price_DRM|index|correct_list_last_detect_price_DRM_without_NA           |
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+
|BABA            |2021-06-15 16:36:30|NO        |null                 |1    |[]                                                      |
|BABA            |2021-06-15 16:37:00|NO        |null                 |2    |[]                                                      |
|BABA            |2021-06-15 16:37:30|YES       |211.85               |3    |[211.85]                                                |
|BABA            |2021-06-15 16:38:00|NO        |211.85               |4    |[211.85]                                                |
|BABA            |2021-06-15 16:38:30|NO        |211.85               |5    |[211.85]                                                |
|BABA            |2021-06-15 16:39:00|NO        |211.85               |6    |[211.85]                                                |
|BABA            |2021-06-15 16:39:30|NO        |211.85               |7    |[211.85]                                                |
|BABA            |2021-06-15 16:40:00|NO        |211.85               |8    |[211.85]                                                |
|BABA            |2021-06-15 16:40:30|NO        |211.85               |9    |[211.85]                                                |
|BABA            |2021-06-15 16:41:00|YES       |211.91               |10   |[211.85, 211.91]                                        |
|BABA            |2021-06-15 16:41:30|NO        |211.91               |11   |[211.85, 211.91]                                        |
|BABA            |2021-06-15 16:42:00|NO        |211.91               |12   |[211.85, 211.91]                                        |
|BABA            |2021-06-15 16:42:30|YES       |211.83               |13   |[211.85, 211.91, 211.83]                                |
|BABA            |2021-06-15 16:43:00|NO        |211.83               |14   |[211.85, 211.91, 211.83]                                |
|BABA            |2021-06-15 16:43:30|YES       |211.75               |15   |[211.85, 211.91, 211.83, 211.75]                        |
|BABA            |2021-06-15 16:44:00|NO        |211.75               |16   |[211.85, 211.91, 211.83, 211.75]                        |
|BABA            |2021-06-15 16:44:30|NO        |211.75               |17   |[211.85, 211.91, 211.83, 211.75]                        |
|BABA            |2021-06-15 16:45:00|NO        |211.75               |18   |[211.85, 211.91, 211.83, 211.75]                        |
|BABA            |2021-06-15 16:45:30|YES       |211.72               |19   |[211.85, 211.91, 211.83, 211.75, 211.72]                |
|BABA            |2021-06-15 16:46:00|NO        |211.72               |20   |[211.85, 211.91, 211.83, 211.75, 211.72]                |
|BABA            |2021-06-15 16:46:30|NO        |211.72               |21   |[211.85, 211.91, 211.83, 211.75, 211.72]                |
|BABA            |2021-06-15 16:47:00|NO        |211.72               |22   |[211.85, 211.91, 211.83, 211.75, 211.72]                |
|BABA            |2021-06-15 16:47:30|YES       |211.81               |23   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81]        |
|BABA            |2021-06-15 16:48:00|NO        |211.81               |24   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81]        |
|BABA            |2021-06-15 16:48:30|NO        |211.81               |25   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81]        |
|BABA            |2021-06-15 16:49:00|YES       |211.93               |26   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|
|BABA            |2021-06-15 16:49:30|NO        |211.93               |27   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|
|BABA            |2021-06-15 16:50:00|NO        |211.93               |28   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+

现在我使用以下 udf 函数remove_from_list_udf计算列start_data_analysis_window,基本上有两列作为参数传递给它。一列包含一个数字,另一列包含一个数组,它执行的功能是循环数组,当数字小于或等于时,它将存储在一个辅助列表中,该列表将被返回。

def remove_from_list(last_detect_price_DRM, correct_list_last_detect_price_DRM_without_NA):
    final_list = []
    for i in correct_list_last_detect_price_DRM_without_NA:
        try:
            if last_detect_price_DRM <= float(i):
                final_list.append(i)
        except TypeError:
            pass
    return final_list
remove_from_list_udf = f.udf(remove_from_list, ArrayType(StringType()))
historical_trends = historical_trends\
    .withColumn(
        "start_data_analysis_window",
    remove_from_list_udf(f.col('last_detect_price_DRM'), f.col('correct_list_last_detect_price_DRM_without_NA'))
        )\
    .withColumn(
        'number_DRM', f.size(f.col('start_data_analysis_window'))
        )

结果和预期的一样,所以我很高兴:

+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+----------------------------------------+----------+
|localSymbol_drop|end_window_drop    |detect_DRM|last_detect_price_DRM|index|correct_list_last_detect_price_DRM_without_NA           |start_data_analysis_window              |number_DRM|
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+----------------------------------------+----------+
|BABA            |2021-06-15 16:36:30|NO        |null                 |1    |[]                                                      |[]                                      |0         |
|BABA            |2021-06-15 16:37:00|NO        |null                 |2    |[]                                                      |[]                                      |0         |
|BABA            |2021-06-15 16:37:30|YES       |211.85               |3    |[211.85]                                                |[211.85]                                |1         |
|BABA            |2021-06-15 16:38:00|NO        |211.85               |4    |[211.85]                                                |[211.85]                                |1         |
|BABA            |2021-06-15 16:38:30|NO        |211.85               |5    |[211.85]                                                |[211.85]                                |1         |
|BABA            |2021-06-15 16:39:00|NO        |211.85               |6    |[211.85]                                                |[211.85]                                |1         |
|BABA            |2021-06-15 16:39:30|NO        |211.85               |7    |[211.85]                                                |[211.85]                                |1         |
|BABA            |2021-06-15 16:40:00|NO        |211.85               |8    |[211.85]                                                |[211.85]                                |1         |
|BABA            |2021-06-15 16:40:30|NO        |211.85               |9    |[211.85]                                                |[211.85]                                |1         |
|BABA            |2021-06-15 16:41:00|YES       |211.91               |10   |[211.85, 211.91]                                        |[211.91]                                |1         |
|BABA            |2021-06-15 16:41:30|NO        |211.91               |11   |[211.85, 211.91]                                        |[211.91]                                |1         |
|BABA            |2021-06-15 16:42:00|NO        |211.91               |12   |[211.85, 211.91]                                        |[211.91]                                |1         |
|BABA            |2021-06-15 16:42:30|YES       |211.83               |13   |[211.85, 211.91, 211.83]                                |[211.85, 211.91, 211.83]                |3         |
|BABA            |2021-06-15 16:43:00|NO        |211.83               |14   |[211.85, 211.91, 211.83]                                |[211.85, 211.91, 211.83]                |3         |
|BABA            |2021-06-15 16:43:30|YES       |211.75               |15   |[211.85, 211.91, 211.83, 211.75]                        |[211.85, 211.91, 211.83, 211.75]        |4         |
|BABA            |2021-06-15 16:44:00|NO        |211.75               |16   |[211.85, 211.91, 211.83, 211.75]                        |[211.85, 211.91, 211.83, 211.75]        |4         |
|BABA            |2021-06-15 16:44:30|NO        |211.75               |17   |[211.85, 211.91, 211.83, 211.75]                        |[211.85, 211.91, 211.83, 211.75]        |4         |
|BABA            |2021-06-15 16:45:00|NO        |211.75               |18   |[211.85, 211.91, 211.83, 211.75]                        |[211.85, 211.91, 211.83, 211.75]        |4         |
|BABA            |2021-06-15 16:45:30|YES       |211.72               |19   |[211.85, 211.91, 211.83, 211.75, 211.72]                |[211.85, 211.91, 211.83, 211.75, 211.72]|5         |
|BABA            |2021-06-15 16:46:00|NO        |211.72               |20   |[211.85, 211.91, 211.83, 211.75, 211.72]                |[211.85, 211.91, 211.83, 211.75, 211.72]|5         |
|BABA            |2021-06-15 16:46:30|NO        |211.72               |21   |[211.85, 211.91, 211.83, 211.75, 211.72]                |[211.85, 211.91, 211.83, 211.75, 211.72]|5         |
|BABA            |2021-06-15 16:47:00|NO        |211.72               |22   |[211.85, 211.91, 211.83, 211.75, 211.72]                |[211.85, 211.91, 211.83, 211.75, 211.72]|5         |
|BABA            |2021-06-15 16:47:30|YES       |211.81               |23   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81]        |[211.85, 211.91, 211.83, 211.81]        |4         |
|BABA            |2021-06-15 16:48:00|NO        |211.81               |24   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81]        |[211.85, 211.91, 211.83, 211.81]        |4         |
|BABA            |2021-06-15 16:48:30|NO        |211.81               |25   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81]        |[211.85, 211.91, 211.83, 211.81]        |4         |
|BABA            |2021-06-15 16:49:00|YES       |211.93               |26   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|[211.93]                                |1         |
|BABA            |2021-06-15 16:49:30|NO        |211.93               |27   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|[211.93]                                |1         |
|BABA            |2021-06-15 16:50:00|NO        |211.93               |28   |[211.85, 211.91, 211.83, 211.75, 211.72, 211.81, 211.93]|[211.93]                                |1         |
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------+----------------------------------------+----------+

现在,我的问题是是否有任何方法可以使用 Spark SQL 函数覆盖 udf 函数。

如果您不理解问题而不是得分否定,请要求澄清。所以我在提问时也有所改进;)

谢谢!

标签: apache-sparkpysparkapache-spark-sql

解决方案


我建议您阅读并了解有关高阶函数的更多信息。

historical_trends = historical_trends \
    .withColumn(
        "start_data_analysis_window",
        f.expr("filter(correct_list_last_detect_price_DRM_without_NA, value -> value <= last_detect_price_DRM)")
    )

推荐阅读