首页 > 解决方案 > 在 Spark 中使用 reduceByKey 代替 leftOuterJoin 以加快处理速度

问题描述

我有以下 rdds 我想使用 leftOuterJoin 加入他们。我想知道 reduceByKey 是否会比 leftOuterJoin 更有效/更快。

rd0= sc.parallelize([ ('s1', 'o1' ),("s1", 'o2' ),('s2','o2'),("s3",'o3')])
rd1= sc.parallelize([ ('s1', 'oo1' ),("s10", 'oo10' ),('s2','oo2')])
reduceByKeyMethod
rd00 = rd0.map(lambda x:(x[0],([x[1]],[])))
rd11 = rd1.map(lambda x:(x[0],([],[x[1]])))
rd00.union(rd11).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).collect()
Out[22]:
[('s1', (['o1'], [])),
 ('s1', (['o2'], [])),
 ('s2', (['o2'], [])),
 ('s3', (['o3'], [])),
 ('s1', ([], ['oo1'])),
 ('s10', ([], ['oo10'])),
 ('s2', ([], ['oo2']))]

vs 直接rd0.leftOuterJoin(rd1)
使用 leftOuterJoin 对于大型 rd0 和 rd1 数据集,使用 reduceByKey 会更快吗?

标签: apache-sparkpyspark

解决方案


如果我们检查两种方法的执行计划 =>应该没有区别

如使用toDebugString所示

print(rd00.union(rd11).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).toDebugString())

印刷

(4) PythonRDD[15] at RDD at PythonRDD.scala:49 []
 |  MapPartitionsRDD[14] at mapPartitions at PythonRDD.scala:129 []
 |  ShuffledRDD[13] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[12] at reduceByKey at <stdin>:1 []
    |  PythonRDD[11] at reduceByKey at <stdin>:1 []
    |  UnionRDD[10] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[2] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184 []
    |  PythonRDD[3] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:184 []

并左外加入

print(rd00.leftOuterJoin(rd11).toDebugString())

印刷

(4) PythonRDD[23] at RDD at PythonRDD.scala:49 []
 |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:129 []
 |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[20] at leftOuterJoin at <stdin>:1 []
    |  PythonRDD[19] at leftOuterJoin at <stdin>:1 []
    |  UnionRDD[18] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[16] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184 []
    |  PythonRDD[17] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:184 []

推荐阅读