python - Pyspark应用不同的基于reduce函数的键
问题描述
假设我有一些看起来像这样的数据
data =[('yes_sum', np.array([2, 2, 2])),
('yes_sum', np.array([3, 3, 3])),
('no_sum', np.array([4, 4, 4])),
('no_sum', np.array([6, 6, 6]))]
我将其转换为rdd。
rdd_data = sc.parallelize(data)
我想将数组与键相加,'yes_sum'
但将数组与键合并'no_sum'
在一起。所以它看起来像这样:
[('yes_sum', array([5, 5, 5])), ('no_sum', array([4, 4, 4, 6, 6, 6]))]
我只知道如何通过键对数组求和:
rdd_data.reduceByKey(lambda x,y: x + y).collect()
我得到:
[('yes_sum', array([5, 5, 5])), ('no_sum', array([10, 10, 10]))]
但这不是我要找的。我在想这样的事情:
rdd_data.reduceByKey(
lambda x,y: if x.key() == 'yes_sum' x+y else np.concatenate((x, y))
).collect()
解决方案
首先,你的语法:
lambda x,y: if x.key() == 'yes_sum' x+y else np.concatenate((x, y))
是不正确的。相反,你可以写:
lambda x,y: x+y if x.key() == 'yes_sum' else np.concatenate((x, y))
但这将导致:
AttributeError: 'numpy.ndarray' object has no attribute 'key'
当您执行 a时, reduce 函数本身不知道reduceByKey
该部分。key
Spark 已经完成了将来自相似键的数据分组在一起,并将其传递给适当的 reducer 的工作。
为了完成你想要做的事情,你需要filter
在rdd
调用之前reduceByKey
。然后,您可以reduce
根据过滤应用不同的功能,并合并您的结果。
例如:
yes_rdd = rdd_data.filter(lambda x: x[0] == 'yes_sum')\
.reduceByKey(lambda x,y: x + y)
no_rdd = rdd_data.filter(lambda x: x[0] != 'yes_sum')\
.reduceByKey(lambda x,y: np.concatenate((x, y)))
print(yes_rdd.union(no_rdd).collect())
#[('yes_sum', array([5, 5, 5])), ('no_sum', array([4, 4, 4, 6, 6, 6]))]
推荐阅读
- java - 使用 Mockito 验证无参数和私有方法
- javascript - 我如何处理 React 中的未定义错误
- python - 有没有办法限制类变量被对象访问
- python - python help() 打破交互式命令行历史
- javascript - 无法将 Brave 浏览器与 Puppeteer 一起使用
- jquery - 访问最接近的匹配标签的 id
- javascript - 由于 react-native-router-flux 接口组件变得隐藏
- docker - CannotPullContainerError:来自守护程序的错误响应:ECR Repo 映像的拉取访问被拒绝,存储库不存在或可能需要“docker login”
- linux - 15 名 sidekiq 工作人员中有 1 名常驻 (RES) 内存膨胀
- javascript - 在 Sequelize Model 类中键入属性