apache-spark - 在 Spark 中使用 reduceByKey 代替 leftOuterJoin 以加快处理速度
问题描述
我有以下 rdds 我想使用 leftOuterJoin 加入他们。我想知道 reduceByKey 是否会比 leftOuterJoin 更有效/更快。
rd0= sc.parallelize([ ('s1', 'o1' ),("s1", 'o2' ),('s2','o2'),("s3",'o3')])
rd1= sc.parallelize([ ('s1', 'oo1' ),("s10", 'oo10' ),('s2','oo2')])
reduceByKeyMethod
rd00 = rd0.map(lambda x:(x[0],([x[1]],[])))
rd11 = rd1.map(lambda x:(x[0],([],[x[1]])))
rd00.union(rd11).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).collect()
Out[22]:
[('s1', (['o1'], [])),
('s1', (['o2'], [])),
('s2', (['o2'], [])),
('s3', (['o3'], [])),
('s1', ([], ['oo1'])),
('s10', ([], ['oo10'])),
('s2', ([], ['oo2']))]
vs 直接rd0.leftOuterJoin(rd1)
使用 leftOuterJoin 对于大型 rd0 和 rd1 数据集,使用 reduceByKey 会更快吗?
解决方案
如果我们检查两种方法的执行计划 =>应该没有区别
如使用toDebugString所示
print(rd00.union(rd11).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).toDebugString())
印刷
(4) PythonRDD[15] at RDD at PythonRDD.scala:49 []
| MapPartitionsRDD[14] at mapPartitions at PythonRDD.scala:129 []
| ShuffledRDD[13] at partitionBy at NativeMethodAccessorImpl.java:0 []
+-(4) PairwiseRDD[12] at reduceByKey at <stdin>:1 []
| PythonRDD[11] at reduceByKey at <stdin>:1 []
| UnionRDD[10] at union at NativeMethodAccessorImpl.java:0 []
| PythonRDD[2] at RDD at PythonRDD.scala:49 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184 []
| PythonRDD[3] at RDD at PythonRDD.scala:49 []
| ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:184 []
并左外加入
print(rd00.leftOuterJoin(rd11).toDebugString())
印刷
(4) PythonRDD[23] at RDD at PythonRDD.scala:49 []
| MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:129 []
| ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []
+-(4) PairwiseRDD[20] at leftOuterJoin at <stdin>:1 []
| PythonRDD[19] at leftOuterJoin at <stdin>:1 []
| UnionRDD[18] at union at NativeMethodAccessorImpl.java:0 []
| PythonRDD[16] at RDD at PythonRDD.scala:49 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184 []
| PythonRDD[17] at RDD at PythonRDD.scala:49 []
| ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:184 []
推荐阅读
- dhtmlx-scheduler - DHTMLX 调度程序不会加载数据
- javascript - 使用 Javascript 在 Internet Explorer 中打开应用程序/八位字节流 (pdf)
- javascript - 如何“通过单击它关闭的模态外部”来停止碳设计组件中模态元素的默认行为?
- django-rest-framework - 在 DRF 中可以批量创建吗?
- ansible - 如果我创建了多个实例,如何在 wait_for 模块中使用 with_items?
- openlayers - getFeaturesAtPixel() 包括整理(隐藏)的特征
- jenkins - 如何为 Jenkins 声明式管道中的步骤创建方法?
- python - 使用 GlobalObjectives 库中的损失时出错
- python - 我想在 python 3 的文本文件中搜索列表元素
- javascript - 当我的网站*打开*位置时,如何修改模式的文本/描述?