首页 > 解决方案 > 使用poseexplode分解带有索引的嵌套JSON

问题描述

我使用下面的函数来分解深度嵌套的 JSON(具有嵌套的结构和数组)。

# Flatten nested df  
def flatten_df(nested_df): 
        
    for col in nested_df.columns:
        array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    for col in array_cols:
        nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))

    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    
    if len(nested_cols) == 0:
        return nested_df

    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']

    flat_df = nested_df.select(flat_cols +
                            [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])

    return flatten_df(flat_df)

我成功地爆炸了。但我还想在展开的数据框中添加元素的顺序或索引。所以在上面的代码中,我将explode_outer函数替换为posexplode_outer. 但我收到以下错误

An error was encountered:
'The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases'

我尝试更改为nested_df.withColumnnested_df.select但没有成功。谁能帮我分解嵌套的json,但同时将数组元素的顺序保持为分解数据框中的一列。

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


错误是因为posexplode_outer返回两列pos和col,所以不能与Column()一起使用。这可以在选择中使用,如下面的代码所示

from pyspark.sql import functions as F
from pyspark.sql.window import Window
tst= sqlContext.createDataFrame([(1,7,80),(1,8,40),(1,5,100),(5,8,90),(7,6,50),(0,3,60)],schema=['col1','col2','col3'])
tst_new = tst.withColumn("arr",F.array(tst.columns))
expr = tst.columns
expr.append(F.posexplode_outer('arr'))
#%%
tst_explode = tst_new.select(*expr)

结果:

tst_explode.show()
+----+----+----+---+---+
|col1|col2|col3|pos|col|
+----+----+----+---+---+
|   1|   7|  80|  0|  1|
|   1|   7|  80|  1|  7|
|   1|   7|  80|  2| 80|
|   1|   8|  40|  0|  1|
|   1|   8|  40|  1|  8|
|   1|   8|  40|  2| 40|
|   1|   5| 100|  0|  1|
|   1|   5| 100|  1|  5|
|   1|   5| 100|  2|100|
|   5|   8|  90|  0|  5|
|   5|   8|  90|  1|  8|
|   5|   8|  90|  2| 90|
|   7|   6|  50|  0|  7|
|   7|   6|  50|  1|  6|
|   7|   6|  50|  2| 50|
|   0|   3|  60|  0|  0|
|   0|   3|  60|  1|  3|
|   0|   3|  60|  2| 60|
+----+----+----+---+---+

如果需要重命名列,可以使用 .withColumnRenamed() 函数

df_final=(tst_explode.withColumnRenamed('pos','position')).withColumnRenamed('col','column')

推荐阅读