python - Pyspark:扩展 pyspark 数据框,添加缺失的句点
问题描述
我有一个 pyspark 数据框,如下所示。
+-----+----+------+------+-----+-----+
| id|year|period| val_1|val_2|val_3|
+-----+----+------+------+-----+-----+
|94734|2020| 5|160000| 0| 0|
|39066|2011| 1| 20000| 0| 0|
|66198|2013| 1| 22000| 0| 0|
|89691|2015| 5|150000| 0| 0|
|11653|2010| 1| 20000| 0| 0|
+-----+----+------+------+-----+-----+
我正在尝试通过给出一个年份并添加缺失的年份来将上述数据框扩展为以下数据框。
+-----+----+-----------+------+-----+-----+-----+
| id|year|year_period|period|val_1|val_2|val_3|
+-----+----+-----------+------+-----+-----+-----+
|94734|2020| 2020-2021| 1|32000| 0| 0|
|94734|2021| 2021-2022| 1|32000| 0| 0|
|94734|2022| 2022-2023| 1|32000| 0| 0|
|94734|2023| 2023-2024| 1|32000| 0| 0|
|94734|2024| 2024-2025| 1|32000| 0| 0|
|39066|2011| 2011-2012| 1|20000| 0| 0|
|66198|2013| 2013-2014| 1|22000| 0| 0|
|89691|2015| 2015-2016| 1|30000| 0| 0|
|89691|2016| 2016-2017| 1|30000| 0| 0|
|89691|2017| 2017-2018| 1|30000| 0| 0|
|89691|2018| 2018-2019| 1|30000| 0| 0|
|89691|2019| 2019-2020| 1|30000| 0| 0|
|11653|2010| 2010-2011| 1|20000| 0| 0|
| |2012| 2012-2013| 1| 0| 0| 0|
| |2014| 2014-2015| 1| 0| 0| 0|
+-----+----+-----------+------+-----+-----+-----+
我开始尝试下面的代码。
import pyspark.sql.functions as F
cond1 = F.col("period") > 1
new_df = df.withColumn('period', F.expr('explode(array_repeat(period,int(period)))'))
new_df = new_df.withColumn("val_1", F.when(cond1, F.col("val_1")/F.col("period")).otherwise(F.col("val_1")))
new_df = new_df.withColumn("val_2", F.when(cond1, F.col("val_2")/F.col("period")).otherwise(F.col("val_2")))
new_df = new_df.withColumn("val_3", F.when(cond1, F.col("val_3")/F.col("period")).otherwise(F.col("val_3")))
new_df.show()
+-----+----+------+-------+-----+-----+
| id|year|period| val_1|val_2|val_3|
+-----+----+------+-------+-----+-----+
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|39066|2011| 1|20000.0| 0| 0|
|66198|2013| 1|22000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|11653|2010| 1|20000.0| 0| 0|
+-----+----+------+-------+-----+-----+
但我不知道如何继续得到预期的结果。如果有人可以提供帮助,那就太好了。谢谢你。
解决方案
我相信您已经对您的解决方案做好了准备,这是我为达到所需输出而编写的代码
import pyspark.sql.functions as F
from pyspark.sql.window import Window
new_df = (
df.withColumn("period", F.expr("explode(array_repeat(period,int(period)))"))
.withColumn(
"year",
F.col("year")
+ F.row_number().over(Window.partitionBy("id").orderBy(F.lit(1)))
- 1,
)
.withColumn(
"val_1",
F.when(cond1, F.col("val_1") / F.col("period")).otherwise(F.col("val_1")),
)
.withColumn(
"val_2",
F.when(cond1, F.col("val_2") / F.col("period")).otherwise(F.col("val_2")),
)
.withColumn(
"val_3",
F.when(cond1, F.col("val_3") / F.col("period")).otherwise(F.col("val_3")),
)
.withColumn("period", F.lit(1))
.withColumn("year_period", F.concat_ws("-", F.col("year"), F.col("year") + 1))
)
new_df.show()
+-----+----+------+-------+-----+-----+-----------+
| id|year|period| val_1|val_2|val_3|year_period|
+-----+----+------+-------+-----+-----+-----------+
|11653|2010| 1|20000.0| 0.0| 0.0| 2010-2011|
|39066|2011| 1|20000.0| 0.0| 0.0| 2011-2012|
|66198|2013| 1|22000.0| 0.0| 0.0| 2013-2014|
|89691|2015| 1|30000.0| 0.0| 0.0| 2015-2016|
|89691|2016| 1|30000.0| 0.0| 0.0| 2016-2017|
|89691|2017| 1|30000.0| 0.0| 0.0| 2017-2018|
|89691|2018| 1|30000.0| 0.0| 0.0| 2018-2019|
|89691|2019| 1|30000.0| 0.0| 0.0| 2019-2020|
|94734|2020| 1|32000.0| 0.0| 0.0| 2020-2021|
|94734|2021| 1|32000.0| 0.0| 0.0| 2021-2022|
|94734|2022| 1|32000.0| 0.0| 0.0| 2022-2023|
|94734|2023| 1|32000.0| 0.0| 0.0| 2023-2024|
|94734|2024| 1|32000.0| 0.0| 0.0| 2024-2025|
+-----+----+------+-------+-----+-----+-----------+
推荐阅读
- groovy - 如何在 Spock 中为测试套件设置和清理资源
- javascript - Angular 5 回调和 that=this
- yeoman - 约曼哟命令
- firebase - 我无法使用 Firebase 云功能将 sk_test 键替换为 Stripe 上的 sk_live 键
- c# - Windows 窗体 - 当有多个屏幕时如何重新定位窗口对象
- android - system-images 有巨大的空间
- python - 如何在 python 中使用 tf-idf svm sklearn 绘制文本分类
- java - 杰克逊错误Java EnumMap
- swift - 构建 Vapor 3 框架时出现 Xcode 错误
- php - 不依赖顺序解析 JSON