首页 > 解决方案 > 如何在 PySpark 中将两个 rdd 合并为一个

问题描述

我得到两个RDD,并想RDD按如下方式连接并组合成一个:

rdd_1 = ['a1', 'a2', 'a3', 'a4', 'a5', ]
rdd_2 = ['b1', 'b2', 'b3', 'b4', 'b5', ]

# concat and combine these two rdd into one
rdd = ['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

我知道我可以将这两个RDD转换为DataFrame并连接它,spark.sql如下所示:

df = df.withColumn('col1_col2', concat(col('col1'), lit(' '), col('col2')))

但是对于亿级样本来说效率不够。所以我想知道编程
中是否有更快的方法。RRD

标签: apache-sparkpysparkapache-spark-sqlrdd

解决方案


从列表中创建 rdds,然后在两个 rdds 上执行 zip,然后使用 map 和 join 对其进行迭代和连接。

rd1 = sc.parallelize(['a1', 'a2', 'a3', 'a4', 'a5', ])
rd2 = sc.parallelize(['b1', 'b2', 'b3', 'b4', 'b5', ])
rd1.zip(rd2).map(lambda x: x[0]+'_'+x[1]).collect()
rd1.zip(rd2).map(lambda x: '_'.join(x)).collect()
rd1.zip(rd2).map('_'.join).collect()

['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

推荐阅读