pyspark - Pyspark 中具有键值对的 AggregateByKey 函数
问题描述
我对 pyspark 中的 aggregatebykey 有疑问。
我有一个 RDD 数据集如下:premierRDD=[('Chelsea', ('2016–2017', 93)), ('Chelsea', ('2015–2016', 50))]
我希望使用 aggegrateByKey 函数总结 50 和 93 的分数,我的预期输出应该是:[('Chelsea', '2016-2017', (93,143)), ('Chelsea', '2015-2016' , (50,143))]
seqFunc = (lambda x, y: ('', x[0] + y[1]))
combFunc = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
premierAgg = premierMap.aggregateByKey((0,0), seqFunc, combFunc)
但是,我得到了这个输出: [('Chelsea', ('', 143))]
有人可以建议我如何正确使用 aggregrateByKey 函数吗?
解决方案
我调整了您的代码以达到预期的效果。首先,您需要在 seqFunc 中维护“年份”值。因此我在y[0]
那里添加。然后必须将组合更改为不仅包含总和,还包含元组中的原始值。此外,年份值也保持不变。正如我在评论中解释的那样,这将导致[('Chelsea', [(u'2016-2017', (93, 143)), (u'2015-2016', (50, 143))])]
相同的键将被组合。要实现 2 次 chelsea 的输出,您只需使用所述的附加地图功能即可。
rdd = sc.parallelize([('Chelsea', (u"2016-2017", 93)), ('Chelsea', (u"2015-2016", 50))])
seqFunc = (lambda x, y: (y[0], x[0] + y[1]))
combFunc = (lambda x, y: [(x[0], (x[1],x[1] + y[1])),(y[0],(y[1],x[1]+y[1]))])
premierAgg = rdd.aggregateByKey((0,0), seqFunc,combFunc)
print premierAgg.map(lambda r: [(r[0], a) for a in r[1]]).collect()[0]
输出:
[('Chelsea', (u'2016-2017', (93, 143))), ('Chelsea', (u'2015-2016', (50, 143)))]
推荐阅读
- dataframe - Shinyapp io 服务器上的应用程序不会在表格中显示最新日期,但在本地计算机上它可以工作
- javascript - 无法传递参数反应原生 5.x
- javascript - 有什么办法可以让代码更短
- autohotkey - Autohotkey - 我在使用 WinActive 和 Hotstring 编写代码时遇到问题
- docker - IBM 的 Docker Informix Innovator C 14.10.FC4IE 只允许 8GB DBSpace
- python - Python:插值时间不知道确切的模式
- mysql - Mysql在多行查询中中断长查询
- amazon-web-services - ECS 无法理解内存预留
- c++ - 如何正确调整复杂向量的大小?
- c# - 我在后台工作人员中所做的更新没有出现在数据库中