首页 > 解决方案 > 从 Pyspark Column 获取值并将其与 Python 字典进行比较

问题描述

所以我有一个 pyspark 数据框,我想添加另一列以使用 Section_1 列中的值并在 python 字典中找到其对应的值。所以基本上使用 Section_1 单元格中的值作为键,然后在新列中填写 python 字典中的值,如下所示。

原始数据框

数据标识 对象标识 姓名 目的 Section_1
我的数据 数据名称 对象名称 rd.111 rd.123

Python字典

object_map= {'rd.123' : 'rd.567'}

其中第 1 节的值为 rd.123,我将在字典中搜索键“rd.123”并希望返回 rd.567 的值并将其放在新列中

所需的数据框

数据标识 对象标识 姓名 目的 Section_1 Section_2
我的数据 数据名称 对象名称 rd.111 rd.123 rd.567

现在我的当前代码出现了这个错误,我真的不知道我做错了什么,因为我不熟悉 pyspark

您的代码中对 Column 对象的调用不正确。请检查您的代码。

这是我目前正在使用的代码,其中 object_map 是 python 字典。

test_df = output.withColumn('Section_2', object_map.get(output.Section_1.collect()))

标签: pythonapache-sparkdictionarypysparkapache-spark-sql

解决方案


你可以试试这个(改编自这个答案,增​​加了空处理):

from itertools import chain
from pyspark.sql.functions import create_map, lit, when

object_map = {'rd.123': 'rd.567'}
mapping_expr = create_map([lit(x) for x in chain(*object_map.items())])

df1 = df.filter(df['Section_1'].isNull()).withColumn('Section_2', F.lit(None))

df2 = df.filter(df['Section_1'].isNotNull()).withColumn(
    'Section_2', 
    when(
        df['Section_1'].isNotNull(), 
        mapping_expr[df['Section_1']]
    )
)

result = df1.unionAll(df2)

推荐阅读