首页 > 解决方案 > 在 pySpark 中旋转具有数值和分类值的两列

问题描述

我在 pyspark 中有一个这样的数据集: from collections import namedtuple

    user_row = namedtuple('user_row', 'id time category value'.split())
    data = [
        user_row(1,1,'speed','50'),
        user_row(1,1,'speed','60'),
        user_row(1,2,'door', 'open'),
        user_row(1,2,'door','open'),
        user_row(1,2,'door','close'),
        user_row(1,2,'speed','75'),
        user_row(2,10,'speed','30'), 
        user_row(2,11,'door', 'open'),
        user_row(2,12,'door','open'),
        user_row(2,13,'speed','50'),
        user_row(2,13,'speed','40')
    ]
    
    user_df = spark.createDataFrame(data)
    user_df.show()
+---+----+--------+-----+
| id|time|category|value|
+---+----+--------+-----+
|  1|   1|   speed|   50|
|  1|   1|   speed|   60|
|  1|   2|    door| open|
|  1|   2|    door| open|
|  1|   2|    door|close|
|  1|   2|   speed|   75|
|  2|  10|   speed|   30|
|  2|  11|    door| open|
|  2|  12|    door| open|
|  2|  13|   speed|   50|
|  2|  13|   speed|   40|
+---+----+--------+-----+

我想要得到的是如下所示,其中按 id 和时间分组并以类别为中心,如果是数字则返回平均值,如果是分类则返回模式。

+---+----+--------+-----+
| id|time|    door|speed|
+---+----+--------+-----+
|  1|   1|    null|   55|
|  1|   2|    open|   75|
|  2|  10|    null|   30|
|  2|  11|    open| null|
|  2|  12|    open| null|
|  2|  13|    null|   45|
+---+----+--------+-----+

我试过了,但是对于分类值,它返回 null(我不担心速度列中的空值)

    df = user_df\
    .groupBy('id','time')\
    .pivot('category')\
    .agg(avg('value'))\
    .orderBy(['id', 'time'])\
    
    df.show()

+---+----+----+-----+
| id|time|door|speed|
+---+----+----+-----+
|  1|   1|null| 55.0|
|  1|   2|null| 75.0|
|  2|  10|null| 30.0|
|  2|  11|null| null|
|  2|  12|null| null|
|  2|  13|null| 45.0|
+---+----+----+-----+

标签: apache-sparkpysparkapache-spark-sqljupyter-notebookpivot

解决方案


你可以做一个额外的支点并合并它们。试试这个。

import pyspark.sql.functions as F
from collections import namedtuple

user_row = namedtuple('user_row', 'id time category value'.split())
data = [
    user_row(1,1,'speed','50'),
    user_row(1,1,'speed','60'),
    user_row(1,2,'door', 'open'),
    user_row(1,2,'door','open'),
    user_row(1,2,'door','close'),
    user_row(1,2,'speed','75'),
    user_row(2,10,'speed','30'), 
    user_row(2,11,'door', 'open'),
    user_row(2,12,'door','open'),
    user_row(2,13,'speed','50'),
    user_row(2,13,'speed','40')
]

user_df = spark.createDataFrame(data)
#%%
#user_df.show()
df = user_df.groupBy('id','time')\
            .pivot('category')\
            .agg(F.avg('value').alias('avg'),F.max('value').alias('max'))\
#%%
expr1= [x for x in df.columns if '_avg' in x]
expr2= [x for x in df.columns if 'max' in x]
expr=zip(expr1,expr2)
#%%
sel_expr= [F.coalesce(x[0],x[1]).alias(x[0].split('_')[0]) for x in expr]
#%%
    
df_final = df.select('id','time',*sel_expr).orderBy('id','time')

df_final.show()
+---+----+----+-----+
| id|time|door|speed|
+---+----+----+-----+
|  1|   1|null| 55.0|
|  1|   2|open| 75.0|
|  2|  10|null| 30.0|
|  2|  11|open| null|
|  2|  12|open| null|
|  2|  13|null| 45.0|
+---+----+----+-----+

推荐阅读