首页 > 解决方案 > 如何获得两个 RDD [(String, Iterable[String])] 的交集

问题描述

数据由两列组成

A B
A C
A D
B A
B C
B D
B E
C A
C B
C D
C E
D A
D B
D C
D E
E B
E C
E D

在第一行,可以认为 A 和 B 是朋友等。我如何找到他们共同的朋友?

(A,B) -> (C D)

意思是 A 和 B 有共同的朋友 C 和 D。我和 groupByKey 一样接近,结果如下。

(B,CompactBuffer(A, C, D, E))
(A,CompactBuffer(B, C, D))
(C,CompactBuffer(A, B, D, E))
(E,CompactBuffer(B, C, D))
(D,CompactBuffer(A, B, C, E))

编码:

val rdd: RDD[String] = spark.sparkContext.textFile("twocols.txt")
val splitrdd: RDD[(String, String)] = rdd.map { s =>
  var str = s.split(" ")
  new Tuple2(str(0), str(1))
}
val group: RDD[(String, Iterable[String])] = splitrdd.groupByKey()
group.foreach(println)

标签: scalaapache-sparkrdd

解决方案


首先swap是元素:

val swapped = splitRDD.map(_.swap)

然后自加入并swap返回:

val shared =  swapped.join(swapped).map(_.swap)

最后过滤掉重复项(如果需要)和groupByKey

shared.filter { case ((x, y), _) => x < y }.groupByKey

推荐阅读