首页 > 解决方案 > 在pyspark中水平连接多个数据框

问题描述

我正在尝试使用 monotonically_increasing_id() 在 pyspark 中水平连接多个数据帧(具有相同数量的记录)。然而,获得的结果夸大了记录的数量

for i in range(len(lst)+1):
    if i==0:
        df[i] = cust_mod.select('key')
        df[i+1] = df[i].withColumn("idx", monotonically_increasing_id())

    else:
        df_tmp = o[i-1].select(col("value").alias(obj_names[i-1]))
        df_tmp = df_tmp.withColumn("idx", monotonically_increasing_id())

        df[i+1] = df[i].join(df_tmp, "idx", "outer")

df[i+1]=~60m 中的预期记录数。得到:~88m。似乎单调增加的 id 并不总是产生相同的数字。我该如何解决这个问题?

其他详情:

cust_mod > dataframe, count- ~60m
o[i] - another set of dataframes, with length equal to cust_mod
lst - a list than has 49 components . So in total 49 loops

我尝试使用 zipWithIndex():

for i in range(len(lst)+1):
    if i==0:
        df[i] = cust_mod.select('key')
        df[i+1] = df[i].rdd.zipWithIndex().toDF()

    else:
        df_tmp = o[i-1].select("value").rdd.zipWithIndex().toDF()
        df_tmp1 = df_tmp.select(col("_1").alias(obj_names[i-1]),col("_2"))

        df[i+1] = df[i].join(df_tmp1, "_2", "inner").drop(df_tmp1._2)

但它的方式很慢。就像慢了 50 倍。

标签: joinindexingpysparkapache-spark-sql

解决方案


推荐阅读