首页 > 解决方案 > 如何在加入时迭代 Pyspark 中的数组列

问题描述

在 pyspark 中,我有dataframe_a

+-----------+----------------------+
| str1      | array_of_str         |
+-----------+----------------------+
| John      | [mango, apple]       |
| Tom       | [mango, orange]      |
| Matteo    | [apple, banana]      | 

dataframe_b

+-----------+----------------------+
| key       | value                |
+-----------+----------------------+
| mango     | 1                    |
| apple     | 2                    |
| orange    | 3                    | 

我想创建一个 Array 类型的新列,将(dataframe_a)中的joined_result每个元素映射到dataframe_b中的值,例如:array_of_str

+-----------+----------------------+----------------------------------+
| str1      | array_of_str         | joined_result                    |
+-----------+----------------------+----------------------------------+
| John      | [mango, apple]       | [1, 2]                           |
| Tom       | [mango, orange]      | [1, 3]                           |
| Matteo    | [apple, banana]      | [2]                              |

我不知道该怎么做,我知道我可以使用带有 lambda 函数的 udf,但我无法让它工作:( 帮助!

from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType

# START EXTRACT OF CODE
ret = (df
  .select(['str1', 'array_of_str'])
  .withColumn('joined_result', F.udf(
     map(lambda x: ??????, ArrayType(StringType))
  )
)

return ret
# END EXTRACT OF CODE

先感谢您

标签: pysparkpyspark-dataframes

解决方案


我在你的问题中的回答:

lookup_list = map(lambda row: row.asDict(), dataframe_b.collect())
lookup_dict = {lookup['key']:lookup['value'] for lookup in lookup_list}

def mapper(keys):
  return [lookup_dict[key][0] for key in keys]

dataframe_a = dataframe_a.withColumn('joined_result', F.udf(mapper)("arr_of_str"))

它可以按您的意愿工作:-)


推荐阅读