首页 > 解决方案 > 用多维数组展平数据框

问题描述

我目前正在加载数据框中具有复杂模式的 xml 文件。

root
|-- tID: string (nullable = true)
|-- Book: long (nullable = true)
|-- Legs: struct (nullable = true)
     |--Leg: struct (nullable = true)
     |--AssetID: string (nullable = true)
     |--FirstOrder: string (nullable = true)
         |--risk: struct (nullable = true)
             |-- scenarios: struct (nullable = true)
                  |-- scenario: array (nullable = true)
                       |-- element: struct (containsNull = true)
                            |-- _sc: string (nullable = true)
                            |-- _sens: double (nullable = true)
     |--SecondOrder: string (nullable = true)

我想展平数据框并在网上找到了一个代码片段(这里):

def relationalize(df):
    connector = '-'
    def flattenSchema(schema, prefix=None):
        fields = []

    for field in schema.fields:

        #The field.name is added with `` to handle the case where prohibited characters like '.' are a part of the JSON key
        if '.' in field.name:
            name = prefix + '.' + '`' + field.name + '`' if prefix else '`' + field.name + '`'
        else:
            name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType

        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType


        if isinstance(dtype, StructType):
            fields += flattenSchema(dtype, prefix=name)

        else:
            fields.append(name)
    return fields

newDf = df

for col_name in flattenSchema(df.schema):
    newDf = newDf.withColumn(col_name.replace('`', '').replace('.', connector), col(col_name))

for field in newDf.schema:
    if isinstance(field.dataType, StructType):
        newDf = newDf.drop(field.name)
    elif isinstance(field.dataType, ArrayType):
        from pyspark.sql.functions import explode_outer
        newDf = newDf.withColumn(field.name, explode_outer(field.name))

    else:
        continue

return newDf

但是,我拥有的数据框在数组中有 2 个元素。当我执行该函数时,它会爆炸两个字段并因此创建重复的行。

例如以下:

  sc     sens
-----    -----
{1,2}    {a,b}

将创建以下输出:

1 a
2 a
1 b
2 b

但是,所需的输出将是:

1 a
2 b

我知道问题出在以下代码段:

for field in newDf.schema:
    if isinstance(field.dataType, StructType):
        newDf = newDf.drop(field.name)
    elif isinstance(field.dataType, ArrayType):
        from pyspark.sql.functions import explode_outer
        newDf = newDf.withColumn(field.name, explode_outer(field.name))

但是,我不知道如何判断数据属性是“连接的”。我该如何解决这个问题?

标签: pythonapache-sparkdata-structurespyspark

解决方案


推荐阅读