pyspark - 在 PySpark 配对的 RDD 中搜索值,以获取来自另一个 RDD 的键
问题描述
我是 PySpark 的新手,我想做以下事情,
考虑以下代码,
import numpy as np
b =np.array([[1,2,100],[3,4,200],[5,6, 300],[7,8, 400]])
a = np.array([[1,2],[3,4],[11,6],[7,8], [1, 2], [7,8]])
RDDa = sc.parallelize(a)
RDDb = sc.parallelize(b)
dsmRDD = RDDb.map(lambda x: (list(x[:2]), x[2]))
我想获取与 RDDa 的每个值关联的值作为 dsmRDD 的键,即
result = [100, 200, 0, 400, 100, 400]
非常感谢你。
解决方案
正如另一个答案所暗示的那样,您可以转换为数据框和join
. 如果您只愿意继续rdd
,您可以这样做,
import numpy as np
a = np.array([[1,2],[3,4],[11,6],[7,8], [1, 2], [7,8]])
b = np.array([[1,2,100],[3,4,200],[5,6, 300],[7,8, 400]])
RDDa = sc.parallelize(a)
RDDb = sc.parallelize(b)
dsmRDD = RDDa.zipWithIndex()\
.map(lambda x: (tuple(x[0].tolist()),(0,x[1])))\
.leftOuterJoin(RDDb.map(lambda x: (tuple(x[:2].tolist()), x[2])))\
.map(lambda x: (x[1][0][1], x[1][1]) if x[1][1] is not None else (x[1][0][1],x[1][0][0]))
output = map(lambda x:x[1], sorted(dsmRDD.collect()))
print output
这给了你输出,
[100, 200, 0, 400, 100, 400]
推荐阅读
- couchdb - 如何通过复制 .couch 文件来恢复备份数据库
- go - 如何使用elasticsearch在数组中查找元素
- c# - 如何在 C# 中打开和关闭任何应用程序?
- javascript - TypeError: data.data.map is not a function ,我第一次在js上写,反应如何访问字段谁会告诉你。“n”:“成人”
- java - 为什么这个 Worker 在第一次触发时会被触发两次?
- cpu - 关于spec2017的困惑
- kubernetes - 为什么在 dockerfile 卷上挂载时 emptydir 不为空?
- linux - xfce4-terminal Shift+space 隐藏光标
- flutter - 如何在 Flutter 中的特定输入文本字段上设置焦点
- javascript - 我正在加载选项卡式内容....但是默认情况下,每个选项卡内容都一起加载...当我单击选项卡时...然后它遵循选项卡式内容方式