pyspark - 如何在加入时迭代 Pyspark 中的数组列
问题描述
在 pyspark 中,我有dataframe_a:
+-----------+----------------------+
| str1 | array_of_str |
+-----------+----------------------+
| John | [mango, apple] |
| Tom | [mango, orange] |
| Matteo | [apple, banana] |
和dataframe_b与
+-----------+----------------------+
| key | value |
+-----------+----------------------+
| mango | 1 |
| apple | 2 |
| orange | 3 |
我想创建一个 Array 类型的新列,将(dataframe_a)中的joined_result
每个元素映射到dataframe_b中的值,例如:array_of_str
+-----------+----------------------+----------------------------------+
| str1 | array_of_str | joined_result |
+-----------+----------------------+----------------------------------+
| John | [mango, apple] | [1, 2] |
| Tom | [mango, orange] | [1, 3] |
| Matteo | [apple, banana] | [2] |
我不知道该怎么做,我知道我可以使用带有 lambda 函数的 udf,但我无法让它工作:( 帮助!
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
# START EXTRACT OF CODE
ret = (df
.select(['str1', 'array_of_str'])
.withColumn('joined_result', F.udf(
map(lambda x: ??????, ArrayType(StringType))
)
)
return ret
# END EXTRACT OF CODE
先感谢您
解决方案
我在你的问题中的回答:
lookup_list = map(lambda row: row.asDict(), dataframe_b.collect())
lookup_dict = {lookup['key']:lookup['value'] for lookup in lookup_list}
def mapper(keys):
return [lookup_dict[key][0] for key in keys]
dataframe_a = dataframe_a.withColumn('joined_result', F.udf(mapper)("arr_of_str"))
它可以按您的意愿工作:-)
推荐阅读
- html - VML 背景图像位于 Outlook 底部
- python - 具有动态查询集 ModelMultipleChoiceField 的 Django 表单
- amazon-web-services - 谷歌云登录小部件,如 Facebook
- c++ - 为什么一种基于for循环的范围类型在大括号初始化列表上是非法的c ++
- python - 根据条目长度将字符串添加到列
- angular - Angular OnInit:从服务订阅返回的值返回 Empty 空白,因为 API 尚未调用
- sql - 删除sql单元格中的重复数字
- html - 我们可以在没有表单操作提交的情况下将数据从文本框传递到节点 js 吗?
- python - struct.error: 在 Python3.5 中解压需要一个长度为 16 的字节对象
- excel - 如何将相应的值添加到右列(我有一个代码,但速度不够快)