python - 连接具有多个值组件的两个 RDD 并将结果展平
问题描述
我有 2 个具有相同键的 RDD,但值类型不同(超过 2 个值)。我想按键加入这些 RDD,然后将它们的值附加到最后的元组中(见下文)。最好的方法是什么?
rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])
期望的输出 RDD
[ (1, "Foo", "test1", [5,6,7]), (2, "Bar", "test2", [1,2,3]) ]
直接加入不起作用:
print(rdd2.join(rdd1).collect())
#[(1, ('Foo', 'test1')), (2, ('Bar', 'test2'))]
这将忽略 in 中的其余值,rdd1
并且输出格式错误。
解决方案
您可以在此处使用join
,前提是您首先将 映射rdds
到表单(key, value)
中。
rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])
def map_to_kvp(row):
if len(row) < 3:
return row
return (row[0], tuple(row[1:]))
rdd3 = rdd2.map(map_to_kvp).join(rdd1.map(map_to_kvp))
print(rdd3.collect())
#[
# (1, ('Foo', ('test1', [5, 6, 7]))),
# (2, ('Bar', ('test2', [1, 2, 3])))
#]
现在您已经在正确的位置获得了所有数据,但您只需要将结果行展平即可。
在这种情况下,您必须编写自己的flatten
函数以避免将string
and变平list
。
我们可以在这个答案的基础上构建如何在不拆分字符串的情况下展平列表?制作自己的功能:
def flatten(foo):
for x in foo:
if hasattr(x, '__iter__') and not isinstance(x, str) and not isinstance(x, list):
for y in flatten(x):
yield y
else:
yield x
rdd4 = rdd3.map(lambda row: tuple(flatten(row)))
print(rdd4.collect())
#[(1, 'Foo', 'test1', [5, 6, 7]), (2, 'Bar', 'test2', [1, 2, 3])]
推荐阅读
- python - 按字符串列表对字典列表进行排序
- reactjs - Rollup 和 typescript 包没有编辑器代码完成并跳转到声明
- amazon-web-services - 在资源声明之外将环境变量附加到 Lambda
- firefox - 使用 policy.json 设置 Firefox 首选项
- r - 用 R + ggplot 在折线图中画一个点
- javascript - 无法取消选中所有复选框
- css - 如何使用 vuetify 和 Vue.delete() 使用效果淡出警报
- ms-access - 文本框中的表达式未将值传递给表
- c# - 查找和替换作业中的值
- chef-infra - 当重试次数大于 0 且 ignore_failure 为真时,是否会重复执行 ruby_block?