首页 > 解决方案 > 在 Pyspark 数据框上查找本月至今和月份

问题描述

我在 Spark 中有以下数据框(使用 PySpark):

DT_BORD_REF: 时间戳列,
COUNTRY_ALPHA: 国家 Alpha-3 代码,
working_day_flag: 日期是否为该国家/地区的工作日

我需要添加两个字段:

好像是一个窗口函数的应用,但我想不通

+-------------------+-------------+----------------+
|        DT_BORD_REF|COUNTRY_ALPHA|working_day_flag|
+-------------------+-------------+----------------+
|2021-01-01 00:00:00|          FRA|               N|
|2021-01-01 00:00:00|          ITA|               N|
|2021-01-01 00:00:00|          BRA|               N|
|2021-01-02 00:00:00|          BRA|               N|
|2021-01-02 00:00:00|          FRA|               N|
|2021-01-02 00:00:00|          ITA|               N|
|2021-01-03 00:00:00|          ITA|               N|
|2021-01-03 00:00:00|          BRA|               N|
|2021-01-03 00:00:00|          FRA|               N|
|2021-01-04 00:00:00|          BRA|               Y|
|2021-01-04 00:00:00|          FRA|               Y|
|2021-01-04 00:00:00|          ITA|               Y|
|2021-01-05 00:00:00|          FRA|               Y|
|2021-01-05 00:00:00|          BRA|               Y|
|2021-01-05 00:00:00|          ITA|               Y|
|2021-01-06 00:00:00|          ITA|               N|
|2021-01-06 00:00:00|          FRA|               Y|
|2021-01-06 00:00:00|          BRA|               Y|
|2021-01-07 00:00:00|          ITA|               Y|
+-------------------+-------------+----------------+

标签: sqlapache-sparkpysparkapache-spark-sqlwindow-functions

解决方案


在 Window 函数上使用运行总和。要将窗口限制为一个月和一个国家/地区,请使用COUNTRY_ALPHA和分区DATE_TRUNC(DT_BORD_REF, 'MONTH')。然后使用无界前行和当前行之间的行,您可以获得直到当前日期的工作日总和。相同的逻辑适用于通过使用 1 个关注和无限关注之间的行来获取该月的剩余天数。

要仅过滤带有 的天数working_day_flag = 'Y',请使用带有 的条件总和case/when

这是您在问题中提供的示例数据的工作示例:

df.createOrReplaceTempView("df")

sql_query = """
SELECT
  *,
  SUM(CASE
    WHEN BOOLEAN(working_day_flag) THEN 1
    ELSE 0
  END) OVER (
  PARTITION BY COUNTRY_ALPHA, DATE_TRUNC('MONTH', DT_BORD_REF)
  ORDER BY DT_BORD_REF ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) AS month_to_date,

  COALESCE(SUM(CASE
    WHEN BOOLEAN(working_day_flag) THEN 1
    ELSE 0
  END) OVER (
  PARTITION BY COUNTRY_ALPHA, DATE_TRUNC('MONTH', DT_BORD_REF)
  ORDER BY DT_BORD_REF ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING
  ), 0) AS month_to_go

FROM df
""" 

spark.sql(sql_query).show()

#+-------------------+-------------+----------------+-------------+-----------+
#|        DT_BORD_REF|COUNTRY_ALPHA|working_day_flag|month_to_date|month_to_go|
#+-------------------+-------------+----------------+-------------+-----------+
#|2021-01-01 00:00:00|          BRA|               N|            0|          3|
#|2021-01-02 00:00:00|          BRA|               N|            0|          3|
#|2021-01-03 00:00:00|          BRA|               N|            0|          3|
#|2021-01-04 00:00:00|          BRA|               Y|            1|          2|
#|2021-01-05 00:00:00|          BRA|               Y|            2|          1|
#|2021-01-06 00:00:00|          BRA|               Y|            3|          0|
#|2021-01-01 00:00:00|          FRA|               N|            0|          3|
#|2021-01-02 00:00:00|          FRA|               N|            0|          3|
#|2021-01-03 00:00:00|          FRA|               N|            0|          3|
#|2021-01-04 00:00:00|          FRA|               Y|            1|          2|
#|2021-01-05 00:00:00|          FRA|               Y|            2|          1|
#|2021-01-06 00:00:00|          FRA|               Y|            3|          0|
#|2021-01-01 00:00:00|          ITA|               N|            0|          3|
#|2021-01-02 00:00:00|          ITA|               N|            0|          3|
#|2021-01-03 00:00:00|          ITA|               N|            0|          3|
#|2021-01-04 00:00:00|          ITA|               Y|            1|          2|
#|2021-01-05 00:00:00|          ITA|               Y|            2|          1|
#|2021-01-06 00:00:00|          ITA|               N|            2|          1|
#|2021-01-07 00:00:00|          ITA|               Y|            3|          0|

推荐阅读