pyspark - pyspark如何加入关键但还包括其他列?
问题描述
我有两个想要加入的 RDD
RDD1
((a, b, c, d, e), 5)
((a, b, c, d1, e), 12)
((a, b, c, d2, e), 29)
RDD2
((a, b, c, f, e), 100)
我希望最终加入结果如下:
((a, b, c, d, e), 5, 100)
((a, b, c, d1, e), 12, 100)
((a, b, c, d2, e), 29, 100)
所以连接键是'a,b,c,e'。只要键匹配,我想忽略第 4 列并将总 100 加入 RDD1。
我知道如何在 sql 中执行此操作,但不确定如何在 pyspark 中执行此操作。这是我的进度,但我无法得到结果,因为我不确定如何加入密钥,并且如上所述将第 4 列重新放入。
rdd1 = sc.parallelize((a, b, c, d, e), 5),((a, b, c, d1, e), 12),((a, b, c, d2, e), 29))
rdd2 = sc.parallelize(((a, b, c, f, e), 100))
rdd1.coalesce(50).map(lambda x: [x[0][0], x[0][1], x[0][2], x[0][4], x[1]]) \
.join(rdd2.map(lambda x: [x[0][0], x[0][1], x[0][2], x[0][4], x[1]])
(a, b, c, e, 5, 100)
(a, b, c, e, 12, 100)
(a, b, c, e, 29, 100)
有小费吗?
解决方案
我稍微更改了您的输入(假设 a、b、c 是字符串),并添加了一些括号。
至于解决方案,您可以将整个 rdd1 保留在第一个地图中,以便在加入后重新创建您的密钥:
rdd1 = sc.parallelize([(("a", "b", "c", "d", "e"), 5), (("a", "b", "c", "d1", "e"), 12), (("a", "b", "c", "d2", "e"), 29)])
rdd2 = sc.parallelize([(("a", "b", "c", "f", "e"), 100)])
rdd_res = (rdd1.map(lambda x: ((x[0][0], x[0][1], x[0][2], x[0][4]), x)) # take the whole thing since we want to keep it
.join(rdd2.map(lambda x: ((x[0][0], x[0][1], x[0][2], x[0][4]), x[1]))) # take just the last part to append
.map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])) # get rid of the temporary key and reformat so last two parts are flat
)
rdd_res.collect()
#[(('a', 'b', 'c', 'd', 'e'), 5, 100), (('a', 'b', 'c', 'd1', 'e'), 12, 100), (('a', 'b', 'c', 'd2', 'e'), 29, 100)]
推荐阅读
- ios - 将子视图定位在圆形视图的边缘
- kotlin - 如何在有效负载和从数据库中获取的实体之间找到更新的字段,并创建一个对象,该对象的字段具有更新的值,其余为 Null
- python - 当我从目录外导入此函数时,为什么我的 python 导入语句会失败?
- c# - 如何使用计时器和按键事件移动对象(如蛇)
- c++ - 如何绘制多个矩形 FLTK C++
- java - 使用缓存作为数据库前面的一层
- angular - Angular如何计算数组中的数组数量
- javascript - 如何在javascript循环中进行一些算术测量
- unit-testing - 在 Azure Dev Ops 中对 .net Core 应用程序进行单元测试时的问题
- powerbi - 如何筛选 MS Dynamics 客户门户上的嵌入式 Power BI 报表