首页 > 解决方案 > Pyspark应用不同的基于reduce函数的键

问题描述

假设我有一些看起来像这样的数据

data =[('yes_sum', np.array([2, 2, 2])),
 ('yes_sum', np.array([3, 3, 3])),
 ('no_sum', np.array([4, 4, 4])),
 ('no_sum', np.array([6, 6, 6]))]

我将其转换为rdd。

rdd_data = sc.parallelize(data)

我想将数组与键相加,'yes_sum'但将数组与键合并'no_sum'在一起。所以它看起来像这样:

[('yes_sum', array([5, 5, 5])), ('no_sum', array([4, 4, 4, 6, 6, 6]))]

我只知道如何通过键对数组求和:

rdd_data.reduceByKey(lambda x,y: x + y).collect()

我得到:

[('yes_sum', array([5, 5, 5])), ('no_sum', array([10, 10, 10]))]

但这不是我要找的。我在想这样的事情:

rdd_data.reduceByKey(
    lambda x,y: if x.key() == 'yes_sum' x+y else np.concatenate((x, y))
).collect() 

标签: pythonapache-sparkpysparkrdd

解决方案


首先,你的语法:

lambda x,y: if x.key() == 'yes_sum' x+y else np.concatenate((x, y))

是不正确的。相反,你可以写:

lambda x,y: x+y if x.key() == 'yes_sum' else np.concatenate((x, y))

但这将导致:

AttributeError: 'numpy.ndarray' object has no attribute 'key'

当您执行 a时, reduce 函数本身不知道reduceByKey该部分。keySpark 已经完成了将来自相似键的数据分组在一起,并将其传递给适当的 reducer 的工作。

为了完成你想要做的事情,你需要filterrdd调用之前reduceByKey。然后,您可以reduce根据过滤应用不同的功能,并合并您的结果。

例如:

yes_rdd = rdd_data.filter(lambda x: x[0] == 'yes_sum')\
    .reduceByKey(lambda x,y: x + y)

no_rdd = rdd_data.filter(lambda x: x[0] != 'yes_sum')\
    .reduceByKey(lambda x,y: np.concatenate((x, y)))

print(yes_rdd.union(no_rdd).collect())
#[('yes_sum', array([5, 5, 5])), ('no_sum', array([4, 4, 4, 6, 6, 6]))]

推荐阅读