首页 > 解决方案 > Pyspark RDD 以不同方式聚合不同的值字段

问题描述

这是一个非常开放的问题,但我有一个这种格式的 RDD。

[('2014-06', ('131313', 5.5, 6.5, 7.5, 10.5 )),
('2014-07', ('246655', 636636.53, .53252, 5252.112, 5242.23)),
('2014-06', ('131232', 1, 2, 4.5, 5.5)),
('2014-07', ('131322464', 536363.6363, 536336.6363, 3563.63636, 9.6464464646464646))]

我想通过键对每个值进行不同的分组和聚合。例如,对于键,'2014-06' 我想获取第一个值字段的计数,即键'131313'的其他字段的平均值。5.5, 6.5, 7.5, 10.5'2014-06'

因此,上述 key 的简单示例的结果'2014-06'将是('2014-06', (2, 3.25, 5.5, 8)).

对 RDD 执行此操作的最佳方法是什么?我不能使用任何 Spark SQL 表达式或函数,只能使用 RDD 函数。

我正在考虑用 mapValues 做一些事情并使用其他一些函数,但我在制定这个函数时遇到了一些麻烦。

我知道这个问题是非常开放的,所以如果您还有其他问题,请告诉我。

感谢您的时间。

标签: pythonapache-sparkpysparkaggregaterdd

解决方案


@jxc 解决方案可以满足您的需求,但这是另一种方法。

您可以使用aggregateByKey. 这个函数有两个函数seqFunccombFunc一个叫做中性零值的累加器值。

zero_value = (0, 0, 0, 0, 0)
d = rdd.aggregateByKey(zero_value, lambda x, y: (1, *y[1:]),
                       lambda x, y: tuple(map(add, x, y))
                       ) \
    .mapValues(lambda v: (v[0], *[i / v[0] for i in v[1:]])) \

第一个 lambda 表达式通过将第一个字符串字段替换为整数1(计数一次)来转换每个值。第二个 lambda 表达式通过添加两个列表来合并两个值。

在此聚合之后,我们只需将每个值列表的元素除以给出平均值的第一个元素。

输出:

[('2014-06', (2, 3.25, 4.25, 6.0, 8.0)), ('2014-07', (2, 586500.0831500001, 268168.58441, 4407.87418, 2625.938223232323))]

推荐阅读