首页 > 解决方案 > Pyspark 中具有键值对的 AggregateByKey 函数

问题描述

我对 pyspark 中的 aggregatebykey 有疑问。

我有一个 RDD 数据集如下:premierRDD=[('Chelsea', ('2016–2017', 93)), ('Chelsea', ('2015–2016', 50))]

我希望使用 aggegrateByKey 函数总结 50 和 93 的分数,我的预期输出应该是:[('Chelsea', '2016-2017', (93,143)), ('Chelsea', '2015-2016' , (50,143))]

seqFunc = (lambda x, y: ('', x[0] + y[1]))
combFunc = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

premierAgg = premierMap.aggregateByKey((0,0), seqFunc, combFunc)

但是,我得到了这个输出: [('Chelsea', ('', 143))]

有人可以建议我如何正确使用 aggregrateByKey 函数吗?

标签: pysparkaggregate-functions

解决方案


我调整了您的代码以达到预期的效果。首先,您需要在 seqFunc 中维护“年份”值。因此我在y[0]那里添加。然后必须将组合更改为不仅包含总和,还包含元组中的原始值。此外,年份值也保持不变。正如我在评论中解释的那样,这将导致[('Chelsea', [(u'2016-2017', (93, 143)), (u'2015-2016', (50, 143))])]相同的键将被组合。要实现 2 次 chelsea 的输出,您只需使用所述的附加地图功能即可。

rdd = sc.parallelize([('Chelsea', (u"2016-2017", 93)), ('Chelsea', (u"2015-2016", 50))])
seqFunc = (lambda x, y: (y[0], x[0] + y[1]))
combFunc = (lambda x, y: [(x[0], (x[1],x[1] + y[1])),(y[0],(y[1],x[1]+y[1]))])

premierAgg = rdd.aggregateByKey((0,0), seqFunc,combFunc)
print premierAgg.map(lambda r: [(r[0], a) for a in r[1]]).collect()[0]

输出:

[('Chelsea', (u'2016-2017', (93, 143))), ('Chelsea', (u'2015-2016', (50, 143)))]

推荐阅读