python - 包含字典的 pyspark 数据框列的总和
问题描述
我有一个数据框,其中仅包含一列,其中包含 type 的元素MapType(StringType(), IntegerType())
。我想获得该列的累积总和,其中该sum
操作意味着添加两个字典。
最小的例子
a = [{'Maps': ({'a': 1, 'b': 2, 'c': 3})}, {'Maps': ({'a': 2, 'b': 4, 'd': 6})}]
df = spark.createDataFrame(a)
df.show(5, False)
+---------------------------+
|Maps |
+---------------------------+
|Map(a -> 1, b -> 2, c -> 3)|
|Map(a -> 2, b -> 4, d -> 6)|
+---------------------------+
如果我要获得 column 的累积总和Maps
,我应该得到以下结果。
+-----------------------------------+
|Maps |
+-----------------------------------+
|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
+-----------------------------------+
PS 我使用的是 Python 2.6,所以collections.Counter
不可用。如果绝对必要,我可能会安装它。
我的尝试:
我尝试了一种accumulator
基于方法和一种使用fold
.
累加器
def addDictFun(x):
global v
v += x
class DictAccumulatorParam(AccumulatorParam):
def zero(self, d):
return d
def addInPlace(self, d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1
v = sc.accumulator(MapType(StringType(), IntegerType()), DictAccumulatorParam())
cumsum_dict = df.rdd.foreach(addDictFun)
现在最后,我应该将结果字典放在v
. 相反,我得到的错误MapType
是不可迭代的(主要是for k in d1
在函数中的行addInPlace
)。
rdd.fold
基于rdd.fold
的方法如下:
def add_dicts(d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1
cumsum_dict = df.rdd.fold(MapType(StringType(), IntegerType()), add_dicts)
但是,我在MapType is not iterable
这里遇到同样的错误。知道我哪里出错了吗?
解决方案
pyspark.sql.types
是模式描述符,而不是集合或外部语言表示,因此不能与fold
or一起使用Accumulator
。
最直接的解决方案是explode
聚合
from pyspark.sql.functions import explode
df = spark.createDataFrame(
[{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 4, 'd': 6}],
"map<string,integer>"
).toDF("Maps")
df.select(explode("Maps")).groupBy("key").sum("value").rdd.collectAsMap()
# {'d': 6, 'c': 3, 'b': 6, 'a': 3}
你RDD
可以做类似的事情:
from operator import add
df.rdd.flatMap(lambda row: row.Maps.items()).reduceByKey(add).collectAsMap()
# {'b': 6, 'c': 3, 'a': 3, 'd': 6}
或者如果你真的想要fold
from operator import attrgetter
from collections import defaultdict
def merge(acc, d):
for k in d:
acc[k] += d[k]
return acc
df.rdd.map(attrgetter("Maps")).fold(defaultdict(int), merge)
# defaultdict(int, {'a': 3, 'b': 6, 'c': 3, 'd': 6})
推荐阅读
- vb.net - 我的代码有问题 vb aatk = matk += 300
- python - 创建一个长度为零的cython数组
- reactjs - 上下文 API HOC 单元测试
- javascript - 从对象列表中逐行渲染表格
- c# - 在 C# 中以异步形式建立条件
- javascript - 由于标头而导致滚动偏移javascript的问题
- firebase - 将 DataSnapshot 转换为自定义结构对象 - Swift 4
- javascript - 从单独的 JS 文件访问数据响应项(控制台错误)
- javascript - CSS垂直对齐部分标签中的文本?
- javascript - 如何在 MySQL Nodejs 中建立一对多关系?