pyspark - 在 pyspark 中加入超过公共列的 RDD
问题描述
我的 RDD 看起来像这样:
myrdd = sc.parallelize([('a','b'),('a','c'),('b','c'),('c','d')])
我想从我的 RDD 中打印行的第二个元素和任何其他行的第一个元素是常见的行。所以,我想做自我加入。
我想要的结果是:
a,b,c
因为a,b
和b,c
都出现在我的 RDD 中。
我正在使用该join
功能
result = myrdd.join(myrdd)
但它给了我
[('b', ('c', 'c')),
('c', ('d', 'd')),
('a', ('b', 'b')),
('a', ('b', 'c')),
('a', ('c', 'b')),
('a', ('c', 'c'))]
我究竟做错了什么?
编辑:所需的输出:(a,b,c)
或(a,c,b)
(但不是两者)
解决方案
您可以对结果使用过滤器来排除键的两个值不相同的结果。
myrdd = sc.parallelize([('a','b'),('a','c'),('b','c'),('c','d')])
result = myrdd.join(myrdd)
#Output
[('a', ('b', 'b')), ('a', ('b', 'c')), ('a', ('c', 'b')), ('a', ('c', 'c')), ('b', ('c', 'c')), ('c', ('d', 'd'))]
a = result.filter(lambda x:x[1][0] !=x[1][1])
#Output
[('a', ('b', 'c')), ('a', ('c', 'b'))]
您还可以结合这两种操作:
myrdd.join(myrdd).filter(lambda x:x[1][0] !=x[1][1]).collect()
推荐阅读
- javascript - 在按钮 onclick 上调用 javascript 函数
- flutter - 如何在不关闭 AdGuard 的情况下让 Flutter 工作
- php - 通过上传 RDF 文件在 PHP 中可视化 RDF 图
- javascript - 如何以角度更改对象的值
- oracle-apex - 如何删除交互式网格中的过滤器?
- python - ssm-state manager-ansible 剧本
- sql - 如何告诉 BigQuery 在我发送查询时不要“检查”我的查询?我一直找不到表
- google-chrome-extension - chrome 扩展 onHeadersReceived 侦听器重定向位置错误
- node.js - 加密存储在环境变量中的密码
- jquery - 为什么 Tailwindcss 不能覆盖 jQuery mobile css for ?