apache-spark - 即使在使用带有 collect_set 的窗口后,列表也是无序的
问题描述
所以我试图从数据框中收集一组日期。我面临的问题是日期不存在于数据框的顺序中。
Example dataframe (this is a much larger dataset. Basically this dataframe tracks the
beginning date of a week, for every single day in a year)
+--------+-----------+----------+
|year_num|week_beg_dt| cal_dt|
+--------+-----------+----------+
| 2013| 2012-12-31|2012-12-31|
| 2013| 2012-12-31|2013-01-03|
| 2013| 2013-01-07|2013-01-07|
| 2013| 2013-01-07|2013-01-12|
| 2013| 2013-01-14|2013-01-14|
| 2013| 2013-01-14|2013-01-15|
| 2014| 2014-01-01|2014-01-01|
| 2014| 2014-01-01|2014-01-05|
| 2014| 2014-01-07|2014-01-07|
| 2014| 2014-01-07|2014-01-12|
| 2014| 2014-01-15|2014-01-15|
| 2014| 2014-01-15|2014-01-16|
What Im trying to get to is this
+--------+-------------------------------------+
|year_num| dates. |
+--------+-------------------------------------+
| 2013|[2012-12-31, 2013-01-07, 2013-01-14] |
| 2014|[2014-01-01, 2014-01-07, 2014-01-14] |
我已经尝试使用窗口来做到这一点,因为 collect_set 与 groupBy 一起会导致无序集:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('year_num').orderBy('week_beg_dt')
business_days_ = df2.withColumn('dates', F.collect_set('week_beg_dt').over(w)) \
.groupBy('year_num') \
.agg(F.max('dates').alias('dates')) \
.collect()
但我仍然得到无序集。任何建议我做错了什么以及如何解决它?
解决方案
对于Spark 2.4+,使用array_sort
内置函数collect_set
来获取有序列表。
Example:
df1.show()
#+--------+-----------+----------+
#|year_num|week_beg_dt| cal_dt|
#+--------+-----------+----------+
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2013-01-07|2013-01-03|
#+--------+-----------+----------+
#without array_sort
df1.groupBy("year_num").agg(collect_set(col("week_beg_dt"))).show(10,False)
#+--------+------------------------+
#|year_num|collect_set(week_beg_dt)|
#+--------+------------------------+
#|2013 |[2013-01-07, 2012-12-31]|
#+--------+------------------------+
#using array_sort
df1.groupBy("year_num").agg(array_sort(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+------------------------------------+
#|year_num|array_sort(collect_set(week_beg_dt))|
#+--------+------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+------------------------------------+
对于 Spark 的早期版本:
from pyspark.sql.types import *
#udf to sort array
sort_arr_udf=udf(lambda x:sorted(x),ArrayType(StringType()))
df1.groupBy("year_num").agg(sort_arr_udf(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+----------------------------------------+
#|year_num|<lambda>(collect_set(week_beg_dt, 0, 0))|
#+--------+----------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+----------------------------------------+
推荐阅读
- powershell - 从 powershell 发送邮件
- typescript - 如何基于 env 禁用 NestJS Logger?
- java - IJ031070:事务无法继续:STATUS_MARKED_ROLLBACK
- java - 错误:-source 8 中不支持模块 - Intellij Idea
- asp.net-core - Blazor 清理标记字符串
- google-apps-script - 根据单元格中的值发送带有或不带有内联图像的电子邮件
- model - 运行 gensim 的 LDA 模型时出现运行时错误,我该如何解决?
- ruby-on-rails - Windows 10 pro 中的 Docker 文件权限
- python - 如果我想按一个标准升序排序并按另一个标准降序排序,我该如何使用排序功能?
- c# - JsonConvert Items 按特定字段