首页 > 解决方案 > 从 DataFrame 列创建 PySpark 映射并应用于另一个 DataFrame

问题描述

我最近遇到了一个问题,我想将一个 DataFrame 的许多列与另一个 DataFrame 的列进行映射——本质上是一个查找表,允许我用另一组 ID 替换一组 ID。这可以通过与joins我希望映射的列数相匹配的数量来轻松完成。在Scala Spark中,这可以通过 来实现Map。可以做类似的事情PySpark吗?

标签: pythonapache-sparkpyspark

解决方案


这可以通过利用

  • pyspark.sql.functions.map_from_entries
  • pyspark.sql.functions.collect_list
  • pyspark.sql.functions.struct
  • crossJoin

按以下顺序:

# original_and_new_df is a 2 column table containing rows original_id, new_id
# input_df is a table containg colums with original_ids, 
# such as f.col(id_col_1), f.col(id_col_2)

input_df_with_mapping_col = input_df.crossJoin(
    original_and_new_df.select(
        f.map_from_entries(
            f.collect_list(
                f.struct(f.col(original_id), f.col(new_id)))
        ).alias(mapping_column_name)
    )
)

# apply the mappings 
input_df_with_mapping_col.select(
    f.col(mapping_column_name)[f.col(id_col_1)].alias(id_col_1),
    f.col(mapping_column_name)[f.col(id_col_2)].alias(id_col_2),
)

推荐阅读