python - 在 pySpark 中进行逐列减法的最有效方法
问题描述
我在 Spark 中有一个巨大的 DataFrame,看起来像这样(事实上,它有更多的 Value# 列):
Group Type Value#1 Value#2 Value#3
G1 Revenue 11 22 33
G2 Revenue 12 23 34
G3 Revenue 13 24 35
G4 Revenue 14 25 36
G5 Revenue 15 26 37
G1 Costs 1 1 1
G2 Costs 2 2 2
G3 Costs 3 3 3
G4 Costs 4 4 4
G5 Costs 5 5 5
同样,实际数据框包含 120 个 Value# 列。
我需要为行中的每个组类型和列中的值#计算收入 - 成本。
输出应如下所示:
Group Type Value#1 Value#2 Value#3
G1 Profit 10 11 22
G2 Profit 10 11 22
G3 Profit 10 11 22
G4 Profit 10 11 22
G5 Profit 10 11 22
PS 我正在使用来自 Python 的 Spark 2.1。
谢谢!
解决方案
只需根据Type
列将您的 DataFrame 一分为二。然后加入两个过滤的DataFrame并做减法:
import pyspark.sql.functions as f
value_columns = [c for c in df.columns if c not in {'Group', 'Type'}]
df.where("Type = 'Revenue'").alias("rev")\
.join(df.where("Type = 'Costs'").alias('cost'), on=["Group"])\
.select(
"Group",
f.lit("Profit").alias("Type"),
*[(f.col("rev."+c)-f.col("cost."+c)).alias(c) for c in value_columns]
)\
.show()
#+-----+------+-------+-------+-------+
#|Group| Type|Value#1|Value#2|Value#3|
#+-----+------+-------+-------+-------+
#| G2|Profit| 10| 21| 32|
#| G3|Profit| 10| 21| 32|
#| G4|Profit| 10| 21| 32|
#| G5|Profit| 10| 21| 32|
#| G1|Profit| 10| 21| 32|
#+-----+------+-------+-------+-------+
推荐阅读
- amazon-web-services - 使用代码授予的简单 Cognito 用户身份验证不起作用
- tensorflow - tensorflow keras 保存和加载模型
- sql-server - (T-SQL) 夏令时是否发生在最后一小时?
- c# - C# PasswordDeriveBytes:似乎 Salt 无关紧要
- dynamics-crm - 了解 FetchXML 中的链接实体
- ios - 为什么没有状态栏的新 iPad 上的 safeAreaInsets 和 safeAreaLayoutGuide 看起来不对?
- typescript - 如何检查我的属性是否存在于类型中
- python - 来自多个模型的 Django 过滤器
- mongodb - 通过 MongoDB (docker) 进行 CAS 授权
- javascript - 如何让服务器在普通的 PHP / SQL 网站后面执行任务?