matrix - Pyspark:按列加权平均
问题描述
例如,我有一个这样的数据集
test = spark.createDataFrame([
(0, 1, 5, "2018-06-03", "Region A"),
(1, 1, 2, "2018-06-04", "Region B"),
(2, 2, 1, "2018-06-03", "Region B"),
(3, 3, 1, "2018-06-01", "Region A"),
(3, 1, 3, "2018-06-05", "Region A"),
])\
.toDF("orderid", "customerid", "price", "transactiondate", "location")
test.show()
我可以通过以下方式获得客户区域订单计数矩阵
overall_stat = test.groupBy("customerid").agg(count("orderid"))\
.withColumnRenamed("count(orderid)", "overall_count")
temp_result = test.groupBy("customerid").pivot("location").agg(count("orderid")).na.fill(0).join(overall_stat, ["customerid"])
for field in temp_result.schema.fields:
if str(field.name) not in ['customerid', "overall_count", "overall_amount"]:
name = str(field.name)
temp_result = temp_result.withColumn(name, col(name)/col("overall_count"))
temp_result.show()
数据看起来像这样
现在,我想通过 计算加权平均值,overall_count
我该怎么做?
结果应该是(0.66*3+1*1)/4
区域 A 和(0.33*3+1*1)/4
区域 B
我的想法:
当然可以通过将数据转成python/pandas然后做一些计算来实现,但是在什么情况下我们应该使用Pyspark呢?
我可以得到类似的东西
temp_result.agg(sum(col("Region A") * col("overall_count")), sum(col("Region B")*col("overall_count"))).show()
但感觉不太对劲,尤其是如果region
要数很多 s 的话。
解决方案
您可以通过将上述步骤分成多个阶段来实现加权平均。
考虑以下:
Dataframe Name: sales_table
[ total_sales, count_of_orders, location]
[ 50 , 9 , A ]
[ 80 , 4 , A ]
[ 90 , 7 , A ]
计算上述(70)的分组加权平均值分为两个步骤:
- 乘以
sales
_importance
- 聚合
sales_x_count
产品 - 除以
sales_x_count
原来的总和
如果我们在 PySpark 代码中将上述内容分成几个阶段,您可以获得以下内容:
new_sales = sales_table \
.withColumn("sales_x_count", col("total_sales") * col("count_orders")) \
.groupBy("Location") \
.agg(sf.sum("total_sales").alias("sum_total_sales"), \
sf.sum("sales_x_count").alias("sum_sales_x_count")) \
.withColumn("count_weighted_average", col("sum_sales_x_count") / col("sum_total_sales"))
所以......这里真的不需要花哨的UDF(并且可能会减慢你的速度)。
推荐阅读
- javascript - 文本操作:从剪贴板检测替换
- arrays - 如何以角度将数据从 API 接收到组件模型
- powerbi - 修改度量以仅影响矩阵表中的总计
- java - 为什么“while(Scanner.hasNext())”会在 java 中导致 OutOfMemoryError?
- node.js - mongoose 查询 搜索 全部
- jquery - 在 DIV 中包含并动画化通过 AJAX 调用的 .TXT 的内容
- ios - OpenCV2 全局函数移到哪里去了?
- javascript - child_process spawn如何实时捕获python输出
- python - scipy.misc.derivative 用于不均匀的空间点
- reactjs - 如何滚动滚动列表中的特定元素(没有选项没有选择html标签,只有div标签)