apache-spark - 每行的大约上一年
问题描述
我有一个包含以下示例行的数据框:
Product Date Revenue
A 2021-05-10 20
A 2021-03-20 10
A 2020-01-10 5
A 2020-03-10 6
A 2020-04-10 7
对于每个产品和日期,我想从其原始日期获得最接近去年的日期。例如,第一行的日期是 2021-05-10,与上一年最接近的日期是 2020-04-10。我想要的结果输出如下:
Product Date Revenue PrevDate PrevRevenue
A 2021-05-10 20 2020-04-10 7
A 2021-03-20 10 2020-03-10 6
A 2020-01-10 5 null null
A 2020-03-10 6 null null
A 2020-04-10 7 null null
解决方案
说df
是你的数据框
data = [['A', '2021-05-10', 20],
['A', '2021-03-20', 10],
['A', '2020-01-10', 5],
['A', '2020-03-10', 6],
['A', '2020-04-10', 7]]
df = spark.createDataFrame(data, "Product:string, Date:string, Revenue:long")
df.show()
# +-------+----------+-------+
# |Product| Date|Revenue|
# +-------+----------+-------+
# | A|2021-05-10| 20|
# | A|2021-03-20| 10|
# | A|2020-01-10| 5|
# | A|2020-03-10| 6|
# | A|2020-04-10| 7|
# +-------+----------+-------+
然后您可以使用函数获取一年前-今天的日期,将数据框add_months
与自身连接以获得date-last_year的组合,使用row_number
函数在按last_year和prevDate之间的天数排序的窗口上对 prevDate 进行排名,然后过滤以获取最近的日期。
from pyspark.sql.functions import col, add_months, row_number, datediff
from pyspark.sql.window import Window
df = (df
.withColumn('last_year', add_months(col('Date'), -12))
.join(df.selectExpr('Product pr', 'Date prevDate', 'Revenue prevRevenue'),
[col('Product') == col('pr'), col('last_year') > col('prevDate')],
'left')
.withColumn('closest', row_number().over(Window
.partitionBy('product', 'date')
.orderBy(datediff(col('last_year'), col('prevDate')))))
.filter('closest = 1')
.drop(*['pr', 'closest'])
)
df.show()
# +-------+----------+-------+----------+----------+-----------+
# |Product| Date|Revenue| last_year| prevDate|prevRevenue|
# +-------+----------+-------+----------+----------+-----------+
# | A|2020-01-10| 5|2019-01-10| null| null|
# | A|2020-03-10| 6|2019-03-10| null| null|
# | A|2020-04-10| 7|2019-04-10| null| null|
# | A|2021-03-20| 10|2020-03-20|2020-03-10| 6|
# | A|2021-05-10| 20|2020-05-10|2020-04-10| 7|
# +-------+----------+-------+----------+----------+-----------+
推荐阅读
- typescript - ES5 声明文件
- python - 如何在没有numpy的情况下将列表转换为矩阵?
- tkinter - 如果 cn 是条目,为什么我不能使用 while cn
- ajax - 我必须使用 ajax 完成实时表,但我坚持一个错误,即 CSRF 令牌不匹配错误
- vega-lite - Vega 在点击时执行排序(转换)
- postgresql - symfony 教义将布尔值转换为整数
- docker - 如何从旧的 Docker 映像中提取模块并添加到新的 Docker 映像中?
- c# - Ajax 如何传递具有文件列表的数组(.NET CORE / AJAX / JQUERY)
- php - PHP计算没有WHERE子句的列
- rust - 如何过滤掉 proc_macro_derive 中所有 Option 类型的字段