首页 > 解决方案 > 合并具有不同列的两个 spark 数据框以获取所有列

问题描述

假设我有 2 个 spark 数据框:

Location    Date        Date_part   Sector      units   
USA         7/1/2021    7/1/2021    Cars        200     
IND         7/1/2021    7/1/2021    Scooters    180     
COL         7/1/2021    7/1/2021    Trucks      100     
Location    Date    Brands  units   values    
UK          null    brand1  400     120       
AUS         null    brand2  450     230       
CAN         null    brand3  150     34        

我需要我的结果数据框

Location    Date        Date_part   Sector      Brands  units   values
USA         7/1/2021    7/1/2021    Cars                200     
IND         7/1/2021    7/1/2021    Scooters            180     
COL         7/1/2021    7/1/2021    Trucks              100
UK          null        7/1/2021                brand1  400     120
AUS         null        7/1/2021                brand2  450     230
CAN         null        7/1/2021                brand3  150     34

所以我想要的 df 应该包含来自两个数据框的所有列,我还需要所有行中的 Date_part 这是我尝试过的:

df_result= df1.union(df_2)

但我得到了这个作为我的结果。正在交换值,并且缺少第二个数据框中的一列。

Location    Date        Date_part   Sector      Brands  units
USA         7/1/2021    7/1/2021    Cars        200     
IND         7/1/2021    7/1/2021    Scooters    180     
COL         7/1/2021    7/1/2021    Trucks      100
UK          null        brand1                  400     120
AUS         null        brand2                  450     230
CAN         null        brand3                  150     34

任何建议请

标签: pythonapache-sparkpyspark

解决方案


union:此函数按位置(不是按名称)解析列

这就是您认为“正在交换值并且第二个数据帧中的一列丢失”的原因。

您应该使用unionByName,但此功能要求两个数据框具有相同的结构。

我为您提供了这个简单的代码来协调数据框的结构,然后执行 union(ByName)。

from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def add_missing_columns(df: DataFrame, ref_df: DataFrame) -> DataFrame:
    """Add missing columns from ref_df to df

    Args:
        df (DataFrame): dataframe with missing columns
        ref_df (DataFrame): referential dataframe

    Returns:
        DataFrame: df with additionnal columns from ref_df
    """
    for col in ref_df.schema:
        if col.name not in df.columns:
            df = df.withColumn(col.name, F.lit(None).cast(col.dataType))

    return df


df1 = add_missing_columns(df1, df2)
df2 = add_missing_columns(df2, df1)

df_result = df1.unionByName(df2)

推荐阅读