首页 > 解决方案 > 包含字典的 pyspark 数据框列的总和

问题描述

我有一个数据框,其中仅包含一列,其中包含 type 的元素MapType(StringType(), IntegerType())。我想获得该列的累积总和,其中该sum操作意味着添加两个字典。

最小的例子

a = [{'Maps': ({'a': 1, 'b': 2, 'c': 3})}, {'Maps': ({'a': 2, 'b': 4, 'd': 6})}]
df = spark.createDataFrame(a)
df.show(5, False)

+---------------------------+
|Maps                       |
+---------------------------+
|Map(a -> 1, b -> 2, c -> 3)|
|Map(a -> 2, b -> 4, d -> 6)|
+---------------------------+

如果我要获得 column 的累积总和Maps,我应该得到以下结果。

+-----------------------------------+
|Maps                               |
+-----------------------------------+
|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
+-----------------------------------+

PS 我使用的是 Python 2.6,所以collections.Counter不可用。如果绝对必要,我可能会安装它。

我的尝试:

我尝试了一种accumulator基于方法和一种使用fold.

累加器

def addDictFun(x):
    global v
    v += x

class DictAccumulatorParam(AccumulatorParam):
    def zero(self, d):
        return d
    def addInPlace(self, d1, d2):
        for k in d1:
            d1[k] = d1[k] + (d2[k] if k in d2 else 0)
        for k in d2:
            if k not in d1:
                d1[k] = d2[k]
        return d1

v = sc.accumulator(MapType(StringType(), IntegerType()), DictAccumulatorParam())
cumsum_dict = df.rdd.foreach(addDictFun)

现在最后,我应该将结果字典放在v. 相反,我得到的错误MapType是不可迭代的(主要是for k in d1在函数中的行addInPlace)。

rdd.fold

基于rdd.fold的方法如下:

def add_dicts(d1, d2):
    for k in d1:
        d1[k] = d1[k] + (d2[k] if k in d2 else 0)
    for k in d2:
        if k not in d1:
            d1[k] = d2[k]
    return d1

cumsum_dict = df.rdd.fold(MapType(StringType(), IntegerType()), add_dicts)

但是,我在MapType is not iterable这里遇到同样的错误。知道我哪里出错了吗?

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


pyspark.sql.types是模式描述符,而不是集合或外部语言表示,因此不能与foldor一起使用Accumulator

最直接的解决方案是explode聚合

from pyspark.sql.functions import explode

df = spark.createDataFrame(
    [{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 4, 'd': 6}], 
    "map<string,integer>"
).toDF("Maps")

df.select(explode("Maps")).groupBy("key").sum("value").rdd.collectAsMap()
# {'d': 6, 'c': 3, 'b': 6, 'a': 3}  

RDD可以做类似的事情:

from operator import add

df.rdd.flatMap(lambda row: row.Maps.items()).reduceByKey(add).collectAsMap()
# {'b': 6, 'c': 3, 'a': 3, 'd': 6}

或者如果你真的想要fold

from operator import attrgetter
from collections import defaultdict

def merge(acc, d):
    for k in d:
        acc[k] += d[k]
    return acc

df.rdd.map(attrgetter("Maps")).fold(defaultdict(int), merge)
# defaultdict(int, {'a': 3, 'b': 6, 'c': 3, 'd': 6})

推荐阅读