python - Pyspark RDD 以不同方式聚合不同的值字段
问题描述
这是一个非常开放的问题,但我有一个这种格式的 RDD。
[('2014-06', ('131313', 5.5, 6.5, 7.5, 10.5 )),
('2014-07', ('246655', 636636.53, .53252, 5252.112, 5242.23)),
('2014-06', ('131232', 1, 2, 4.5, 5.5)),
('2014-07', ('131322464', 536363.6363, 536336.6363, 3563.63636, 9.6464464646464646))]
我想通过键对每个值进行不同的分组和聚合。例如,对于键,'2014-06'
我想获取第一个值字段的计数,即键'131313'
的其他字段的平均值。5.5, 6.5, 7.5, 10.5
'2014-06'
因此,上述 key 的简单示例的结果'2014-06'
将是('2014-06', (2, 3.25, 5.5, 8))
.
对 RDD 执行此操作的最佳方法是什么?我不能使用任何 Spark SQL 表达式或函数,只能使用 RDD 函数。
我正在考虑用 mapValues 做一些事情并使用其他一些函数,但我在制定这个函数时遇到了一些麻烦。
我知道这个问题是非常开放的,所以如果您还有其他问题,请告诉我。
感谢您的时间。
解决方案
@jxc 解决方案可以满足您的需求,但这是另一种方法。
您可以使用aggregateByKey
. 这个函数有两个函数seqFunc
,combFunc
一个叫做中性零值的累加器值。
zero_value = (0, 0, 0, 0, 0)
d = rdd.aggregateByKey(zero_value, lambda x, y: (1, *y[1:]),
lambda x, y: tuple(map(add, x, y))
) \
.mapValues(lambda v: (v[0], *[i / v[0] for i in v[1:]])) \
第一个 lambda 表达式通过将第一个字符串字段替换为整数1
(计数一次)来转换每个值。第二个 lambda 表达式通过添加两个列表来合并两个值。
在此聚合之后,我们只需将每个值列表的元素除以给出平均值的第一个元素。
输出:
[('2014-06', (2, 3.25, 4.25, 6.0, 8.0)), ('2014-07', (2, 586500.0831500001, 268168.58441, 4407.87418, 2625.938223232323))]
推荐阅读
- c++ - 你如何一步一步地正确解释这段代码?(编程新手)
- java - 如何在java中输入数组列表并输出总和
- javascript - 样式化组件嵌套组件错误
- python - When using k nearest neighbors, is there a way to retrieve the "neighbors" that are used?
- docker - .Net Core 3.0 与 AWS ECS(docker run -m)
- python - 向多索引数据帧上的每个索引添加一行
- python - Pandas - 如何获得具有正值和负值的列的总和?
- python - Arcmap 脚本不会在 arcmap 控制台中打印消息
- c - 是否有 C 函数来获取文件数据段的大小?
- c++ - 为什么我在这个函数中的 while 循环有问题?