dataframe - pyspark 数据帧总和
问题描述
我正在尝试执行以下操作pyspark.sql.dataframe
from pyspark.sql.functions import sum as spark_sum
df = spark.createDataFrame([
('a', 1.0, 1.0), ('a',1.0, 0.2), ('b', 1.0, 1.0),
('c' ,1.0, 0.5), ('d', 0.55, 1.0),('e', 1.0, 1.0)
])
>>> df.show()
+---+----+---+
| _1| _2| _3|
+---+----+---+
| a| 1.0|1.0|
| a| 1.0|0.2|
| b| 1.0|1.0|
| c| 1.0|0.5|
| d|0.55|1.0|
| e| 1.0|1.0|
+---+----+---+
然后,我正在尝试执行以下操作。
df[_2]
1)当列>时选择行df[_3]
2) 对于从上面选择的每一行,乘以df[_2] * df[_3]
,然后取它们的总和
3)将上面的结果除以列的总和df[_3]
这是我所做的:
>>> filter_df = df.where(df['_2'] > df['_3'])
>>> filter_df.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| a|1.0|0.2|
| c|1.0|0.5|
+---+---+---+
>>> result = spark_sum(filter_df['_2'] * filter_df['_3'])
/ spark_sum(filter_df['_3'])
>>> df.select(result).show()
+--------------------------+
|(sum((_2 * _3)) / sum(_3))|
+--------------------------+
| 0.9042553191489361|
+--------------------------+
但答案应该是 (1.0 * 0.2 + 1.0 * 0.5) / (0.2+0.5) = 1.0 这是不正确的。什么??
在我看来,这样的操作只对原版进行df
,而不是filter_df
。怎么回事?
解决方案
您需要在 filter_df 中调用它。
>>> result = spark_sum(filter_df['_2'] * filter_df['_3'])
/ spark_sum(filter_df['_3'])
这是一个转换函数,它返回一列并应用于我们应用它的数据帧(惰性评估)。Sum 是一个聚合函数,当在没有任何组的情况下调用时,它适用于整个数据集。
>>> filter_df.select(result).show()
+--------------------------+
|(sum((_2 * _3)) / sum(_3))|
+--------------------------+
| 1.0|
+--------------------------+
推荐阅读
- python - 在排序一个列表期间将不同列表中的相关元素保持在一起
- c# - 如何对通用列表对象进行排序
- javascript - 向和从 js 服务器发送数据的基本方法?
- json - 使用 Pyspark 处理 JSON 结构
- amazon-web-services - 为什么角色假设应该在 lambda 内部完成?
- html - 如何将数值绑定到 CSS 以创建时间线?
- docker - 如何为 docker compose 环境变量设置运行时变量
- javascript - 仅在 Javascript 中具有不同段落的动态 Div
- tsql - 表格模型中的测量列未在浏览器中显示正确的值
- javascript - 调用此函数时如何获取返回值?