python - PySpark 两个值的总和
问题描述
给定以下示例数据框:
advertiser_id| name | amount | total |max_total_advertiser|
4061 |source1|-434.955284|-354882.75336200005| -355938.53950700007
4061 |source2|-594.012216|-355476.76557800005| -355938.53950700007
4061 |source3|-461.773929|-355938.53950700007| -355938.53950700007
我需要将金额和max_total_advertiser字段相加,以便在每一行中获得正确的总值。考虑到我需要按广告商 ID 划分的每个组的总价值。(初始数据框中的总列不正确,这就是我要正确计算的原因)
应该是这样的:
w = Window.partitionBy("advertiser_id").orderBy("advertiser_id")
df.withColumn("total_aux", when( lag("advertiser_id").over(w) == col("advertiser_id"), lag("total_aux").over(w) + col("amount") ).otherwise( col("max_total_advertiser") + col("amount") ))
这lag("total_aux")
不起作用,因为该列尚未生成,这就是我想要实现的,如果它是组中的第一行,则将同一行中的列求和,如果没有将先前获得的值与当前金额字段相加。示例输出:
advertiser_id| name | amount | total_aux |
4061 |source1|-434.955284|-356373.494791 |
4061 |source2|-594.012216|-356967.507007 |
4061 |source3|-461.773929|-357429.280936 |
谢谢。
解决方案
我假设这name
对于每个都是一个不同的值,advertiser_id
因此您的数据集可以按name
. 我还假设max_total_advertiser
每个advertiser_id
. 如果其中一个不是这种情况,请添加评论。
您需要的是一个rangeBetween窗口,它为您提供指定范围内的所有前后行。我们将使用Window.unboundedPreceding
我们想要总结所有以前的值。
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [
(4061, 'source1',-434.955284,-354882.75336200005, -355938.53950700007),
(4061, 'source2',-594.012216,-355476.76557800005, -345938.53950700007),
(4062, 'source1',-594.012216,-355476.76557800005, -5938.53950700007),
(4062, 'source2',-594.012216,-355476.76557800005, -5938.53950700007),
(4061, 'source3',-461.773929,-355938.53950700007, -355938.53950700007)
]
columns = ['advertiser_id','name' ,'amount', 'total', 'max_total_advertiser']
df=spark.createDataFrame(l, columns)
w = Window.partitionBy('advertiser_id').orderBy('name').rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn('total', F.sum('amount').over(w) + df.max_total_advertiser)
df.show()
输出:
+-------------+-------+-----------+-------------------+--------------------+
|advertiser_id| name| amount| total|max_total_advertiser|
+-------------+-------+-----------+-------------------+--------------------+
| 4062|source1|-594.012216|-6532.5517230000705| -5938.53950700007|
| 4062|source2|-594.012216| -7126.563939000071| -5938.53950700007|
| 4061|source1|-434.955284| -356373.4947910001| -355938.53950700007|
| 4061|source2|-594.012216| -346967.5070070001| -345938.53950700007|
| 4061|source3|-461.773929|-357429.28093600005| -355938.53950700007|
+-------------+-------+-----------+-------------------+--------------------+
推荐阅读
- django - 如何在单独的 React App 中从 django 获取 CSRF
- php - 扩展类变量只返回 NULL
- java - 如何在 intelliJ 中将 Java 项目运行到服务器
- swift - SwiftUI 将数据从协调器中获取到内容视图中
- python - Pyqt5 鼠标事件不适用于我的自定义标签栏
- react-native - 如何在 React Native 中将选项卡插入到文本中
- google-sheets - 如何使用查询检索/“选择”日期
- java - 延迟后缓存 Mono<>
- powershell - 检查登录脚本的 AD 用户
- python - 如何在 Django 上将 Amazon S3 与 Pillow 一起使用时修复“TypeError:预期的字符串或类似字节的对象”