首页 > 解决方案 > 在 pyspark 中加入超过公共列的 RDD

问题描述

我的 RDD 看起来像这样:

myrdd = sc.parallelize([('a','b'),('a','c'),('b','c'),('c','d')])

我想从我的 RDD 中打印行的第二个元素和任何其他行的第一个元素是常见的行。所以,我想做自我加入。

我想要的结果是:

a,b,c因为a,bb,c都出现在我的 RDD 中。

我正在使用该join功能

result = myrdd.join(myrdd)

但它给了我

[('b', ('c', 'c')),
 ('c', ('d', 'd')),
 ('a', ('b', 'b')),
 ('a', ('b', 'c')),
 ('a', ('c', 'b')),
 ('a', ('c', 'c'))]

我究竟做错了什么?

编辑:所需的输出:(a,b,c)(a,c,b)(但不是两者)

标签: pysparkrddself-join

解决方案


您可以对结果使用过滤器来排除键的两个值不相同的结果。

myrdd = sc.parallelize([('a','b'),('a','c'),('b','c'),('c','d')])

result = myrdd.join(myrdd)

#Output 

[('a', ('b', 'b')), ('a', ('b', 'c')), ('a', ('c', 'b')), ('a', ('c', 'c')), ('b', ('c', 'c')), ('c', ('d', 'd'))]

a = result.filter(lambda x:x[1][0] !=x[1][1])

#Output 

[('a', ('b', 'c')), ('a', ('c', 'b'))]

您还可以结合这两种操作:

myrdd.join(myrdd).filter(lambda x:x[1][0] !=x[1][1]).collect()

推荐阅读