首页 > 解决方案 > pyspark如何加入关键但还包括其他列?

问题描述

我有两个想要加入的 RDD

RDD1
((a, b, c, d, e), 5)
((a, b, c, d1, e), 12)
((a, b, c, d2, e), 29)

RDD2
((a, b, c, f, e), 100)

我希望最终加入结果如下:

((a, b, c, d, e), 5, 100)
((a, b, c, d1, e), 12, 100)
((a, b, c, d2, e), 29, 100)

所以连接键是'a,b,c,e'。只要键匹配,我想忽略第 4 列并将总 100 加入 RDD1。

我知道如何在 sql 中执行此操作,但不确定如何在 pyspark 中执行此操作。这是我的进度,但我无法得到结果,因为我不确定如何加入密钥,并且如上所述将第 4 列重新放入。

rdd1 = sc.parallelize((a, b, c, d, e), 5),((a, b, c, d1, e), 12),((a, b, c, d2, e), 29))
rdd2 = sc.parallelize(((a, b, c, f, e), 100))

rdd1.coalesce(50).map(lambda x: [x[0][0], x[0][1], x[0][2], x[0][4], x[1]]) \
.join(rdd2.map(lambda x: [x[0][0], x[0][1], x[0][2], x[0][4], x[1]])

(a, b, c, e, 5, 100)
(a, b, c, e, 12, 100)
(a, b, c, e, 29, 100)

有小费吗?

标签: pyspark

解决方案


我稍微更改了您的输入(假设 a、b、c 是字符串),并添加了一些括号。

至于解决方案,您可以将整个 rdd1 保留在第一个地图中,以便在加入后重新创建您的密钥:

rdd1 = sc.parallelize([(("a", "b", "c", "d", "e"), 5), (("a", "b", "c", "d1", "e"), 12), (("a", "b", "c", "d2", "e"), 29)])
rdd2 = sc.parallelize([(("a", "b", "c", "f", "e"), 100)])

rdd_res = (rdd1.map(lambda x: ((x[0][0], x[0][1], x[0][2], x[0][4]), x))  # take the whole thing since we want to keep it
           .join(rdd2.map(lambda x: ((x[0][0], x[0][1], x[0][2], x[0][4]), x[1]))) # take just the last part to append
           .map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])) # get rid of the temporary key and reformat so last two parts are flat
           )

rdd_res.collect()
#[(('a', 'b', 'c', 'd', 'e'), 5, 100), (('a', 'b', 'c', 'd1', 'e'), 12, 100), (('a', 'b', 'c', 'd2', 'e'), 29, 100)]

推荐阅读