python - 如何使用窗口函数使用动态选择查询来选择列
问题描述
我有如下示例输入数据框,但值(以 m 开头的 clm)列可以是 n 个数字。
customer_id|month_id|m1 |m2 |m3 .......m_n
1001 | 01 |10 |20
1002 | 01 |20 |30
1003 | 01 |30 |40
1001 | 02 |40 |50
1002 | 02 |50 |60
1003 | 02 |60 |70
1001 | 03 |70 |80
1002 | 03 |80 |90
1003 | 03 |90 |100
现在,我必须通过每月分组来根据累积和创建新列。因此,我使用了窗口功能。因为,我将有 n 列而不是带有 for 循环的 withColumn,我需要动态创建一个查询或列表并将其传递给 selectExpr 以计算新列。
例如:
rownum_window = (Window.partitionBy("partner_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
df = df.select("*", F.sum(col("m1")).over(rownum_window).alias("n1"))
但是,我想准备一个动态表达式,然后我需要传递给数据框选择。我怎样才能做到这一点?
LIKE: expr = ["F.sum(col("m1")).over(rownum_window).alias("n1")", "F.sum(col("m2")).over(rownum_window).alias("n2")", "F.sum(col("m3")).over(rownum_window).alias("n3")", .......]
df = df.select("*', expr)
或者我可以创建选择表达式的任何其他数据框选择方式?
输出:
customer_id|month_id|m1 |m2 |n1 |n2
1001 | 01 |10 |20 |10 |20
1002 | 01 |20 |30 |20 |30
1003 | 01 |30 |40 |30 |40
1001 | 02 |40 |50 |50 |70
1002 | 02 |50 |60 |70 |90
1003 | 02 |60 |70 |90 |110
1001 | 03 |70 |80 |120 |150
1002 | 03 |80 |90 |150 |180
1003 | 03 |90 |100 |180 |210
解决方案
对@Lamanus 的建议稍作修改,以下代码可能有助于解决您的问题,
# pyspark --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 1
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.window import Window
drow = Row("customer_id","month_id","m1","m2","m3","m4")
data=[drow("1001","01","10","20","10","20"),drow("1002","01","20","30","20","30"),drow("1003","01","30","40","30","40"),drow("1001","02","40","50","40","50"),drow("1002","02","50","60","50","60"),drow("1003","02","60","70","60","70"),drow("1001","03","70","80","70","80"),drow("1002","03","80","90","80","90"),drow("1003","03","90","100","90","100")]
df = spark.createDataFrame(data)
df.show()
'''
+-----------+--------+---+---+---+---+
|customer_id|month_id| m1| m2| m3| m4|
+-----------+--------+---+---+---+---+
| 1001| 01| 10| 20| 10| 20|
| 1002| 01| 20| 30| 20| 30|
| 1003| 01| 30| 40| 30| 40|
| 1001| 02| 40| 50| 40| 50|
| 1002| 02| 50| 60| 50| 60|
| 1003| 02| 60| 70| 60| 70|
| 1001| 03| 70| 80| 70| 80|
| 1002| 03| 80| 90| 80| 90|
| 1003| 03| 90|100| 90|100|
+-----------+--------+---+---+---+---+
'''
a = ["m1","m2"]
b = ["m3","m4"]
rownum_window = (Window.partitionBy("customer_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
expr = ["*",sum(col("m1")).over(rownum_window).alias("sum1"), sum(col("m2")).over(rownum_window).alias("sum2"),avg(col("m3")).over(rownum_window).alias("avg1"), avg(col("m4")).over(rownum_window).alias("avg2") ]
df.select(expr).show()
'''
+-----------+--------+---+---+---+---+-----+-----+----+----+
|customer_id|month_id| m1| m2| m3| m4| sum1| sum2|avg1|avg2|
+-----------+--------+---+---+---+---+-----+-----+----+----+
| 1003| 01| 30| 40| 30| 40| 30.0| 40.0|30.0|40.0|
| 1003| 02| 60| 70| 60| 70| 90.0|110.0|45.0|55.0|
| 1003| 03| 90|100| 90|100|180.0|210.0|60.0|70.0|
| 1002| 01| 20| 30| 20| 30| 20.0| 30.0|20.0|30.0|
| 1002| 02| 50| 60| 50| 60| 70.0| 90.0|35.0|45.0|
| 1002| 03| 80| 90| 80| 90|150.0|180.0|50.0|60.0|
| 1001| 01| 10| 20| 10| 20| 10.0| 20.0|10.0|20.0|
| 1001| 02| 40| 50| 40| 50| 50.0| 70.0|25.0|35.0|
| 1001| 03| 70| 80| 70| 80|120.0|150.0|40.0|50.0|
+-----------+--------+---+---+---+---+-----+-----+----+----+
'''
推荐阅读
- visual-studio - Visual Studio Mac 突出显示代码中的警告
- java - Android如何使用改造传递模态类和参数的数组
- python - 检测列表中的元素是否相等,捕获非空列表
- python - PySpark:TypeError:+:'datetime.datetime'和'str'不支持的操作数类型
- docker - 如何在 localhost 中运行两个或多个应用程序
- node.js - mongoose Model.save() 在 NodeJS 中返回空数据
- javascript - 如何在 nativescript-vue 弹出窗口中使用 vue 组件?
- android - 如何将 MVVM 与任何片段一起使用?
- php - Evaluate existence of relationship as true or false - Laravel Relationships
- mysql - 我们可以从 AWS 只读副本创建 Mysql 视图吗?