首页 > 解决方案 > 从 Pyspark 中的数组映射列

问题描述

当列中存储有数组时,我是使用 Pyspark df 的新手,并在尝试基于 2 个 PySpark Dataframes 映射列时寻求一些帮助,其中一个是参考 df。

参考数据框(每个组的子组数不同):

| Group | Subgroup |       Size        |      Type        |
| ----  | -------- | ------------------| ---------------  |
|A      | A1       |['Small','Medium'] | ['A','B']        |
|A      | A2       |['Small','Medium'] | ['C','D']        |
|B      | B1       |['Small']          | ['A','B','C','D']|

源数据框:

| ID    | Size     |  Type    |     
| ----  | -------- | ---------| 
|ID_001 | 'Small'  |'A'       | 
|ID_002 | 'Medium' |'B'       | 
|ID_003 | 'Small'  |'D'       | 

结果,每个 ID 都属于每个组,但基于参考 df 的子组是专有的,结果如下所示:

| ID    | Size     |  Type    |  A_Subgroup  |   B_Subgroup  |
| ----  | -------- | ---------|  ----------  | ------------- |
|ID_001 | 'Small'  |'A'       | 'A1'         |  'B1'         |
|ID_002 | 'Medium' |'B'       | 'A1'         |  Null         |
|ID_003 | 'Small'  |'D'       | 'A2'         |  'B1'         |

标签: apache-sparkpysparkapache-spark-sql

解决方案


You can do a join using array_contains conditions, and pivot the result:

import pyspark.sql.functions as F

result = source.alias('source').join(
    ref.alias('ref'),
    F.expr("""
        array_contains(ref.Size, source.Size) and 
        array_contains(ref.Type, source.Type)
    """),
    'left'
).groupBy(
    'ID', source['Size'], source['Type']
).pivot('Group').agg(F.first('Subgroup'))

result.show()
+------+------+----+---+----+
|    ID|  Size|Type|  A|   B|
+------+------+----+---+----+
|ID_003| Small|   D| A2|  B1|
|ID_002|Medium|   B| A1|null|
|ID_001| Small|   A| A1|  B1|
+------+------+----+---+----+

推荐阅读